Files
Pac-cogs/videoarchiver/ffmpeg/video_analyzer.py
pacnpal 46af1a31b7 Enhanced FFmpeg Integration:
Added robust error handling and logging
Improved binary verification and initialization
Added proper GPU detection and hardware acceleration
Optimized encoding parameters for different content types
Improved File Operations:

Added retry mechanisms for file operations
Enhanced temporary directory management
Improved cleanup of failed downloads
Added proper permission handling
Enhanced Queue Management:

Fixed queue manager initialization
Added better error recovery
Improved status tracking and logging
Enhanced cleanup of failed items
Better Error Handling:

Added comprehensive exception hierarchy
Improved error logging and reporting
Added fallback mechanisms for failures
Enhanced error recovery strategies
2024-11-15 03:48:56 +00:00

265 lines
9.9 KiB
Python

"""Video analysis functionality for FFmpeg"""
import os
import subprocess
import logging
from pathlib import Path
from typing import Dict, Any, Optional
from contextlib import contextmanager
import tempfile
import shutil
import json
logger = logging.getLogger("VideoArchiver")
@contextmanager
def temp_path_context():
"""Context manager for temporary path creation and cleanup"""
temp_dir = tempfile.mkdtemp(prefix="ffmpeg_")
try:
os.chmod(temp_dir, 0o777)
yield temp_dir
finally:
try:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
logger.error(f"Error cleaning up temp directory {temp_dir}: {e}")
class VideoAnalyzer:
def __init__(self, ffmpeg_path: Path):
"""Initialize video analyzer with FFmpeg path
Args:
ffmpeg_path: Path to FFmpeg binary
"""
self.ffmpeg_path = Path(ffmpeg_path)
self.ffprobe_path = self.ffmpeg_path.parent / (
"ffprobe.exe" if os.name == "nt" else "ffprobe"
)
# Verify paths exist
if not self.ffmpeg_path.exists():
raise FileNotFoundError(f"FFmpeg not found at {self.ffmpeg_path}")
if not self.ffprobe_path.exists():
raise FileNotFoundError(f"FFprobe not found at {self.ffprobe_path}")
logger.info(f"Initialized VideoAnalyzer with FFmpeg: {self.ffmpeg_path}, FFprobe: {self.ffprobe_path}")
def analyze_video(self, input_path: str) -> Dict[str, Any]:
"""Analyze video content for optimal encoding settings"""
try:
if not os.path.exists(input_path):
logger.error(f"Input file not found: {input_path}")
return {}
# Use ffprobe to get video information
probe_result = self._probe_video(input_path)
if not probe_result:
logger.error("Failed to probe video")
return {}
# Get video stream info
video_info = next(
(s for s in probe_result["streams"] if s["codec_type"] == "video"),
None
)
if not video_info:
logger.error("No video stream found")
return {}
# Get video properties with validation
try:
width = int(video_info.get("width", 0))
height = int(video_info.get("height", 0))
fps = self._parse_frame_rate(video_info.get("r_frame_rate", "30/1"))
duration = float(probe_result["format"].get("duration", 0))
bitrate = float(probe_result["format"].get("bit_rate", 0))
except (ValueError, ZeroDivisionError) as e:
logger.error(f"Error parsing video properties: {e}")
return {}
# Advanced analysis with progress logging
logger.info("Starting motion detection analysis...")
has_high_motion = self._detect_high_motion(video_info)
logger.info("Starting dark scene analysis...")
has_dark_scenes = self._analyze_dark_scenes(input_path)
# Get audio properties
audio_info = next(
(s for s in probe_result["streams"] if s["codec_type"] == "audio"),
None
)
audio_props = self._get_audio_properties(audio_info)
result = {
"width": width,
"height": height,
"fps": fps,
"duration": duration,
"bitrate": bitrate,
"has_high_motion": has_high_motion,
"has_dark_scenes": has_dark_scenes,
"has_complex_scenes": self._detect_complex_scenes(video_info),
**audio_props
}
logger.info(f"Video analysis complete: {result}")
return result
except Exception as e:
logger.error(f"Error analyzing video: {str(e)}")
return {}
def _probe_video(self, input_path: str) -> Dict:
"""Use ffprobe to get video information"""
try:
cmd = [
str(self.ffprobe_path),
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
"-show_frames",
"-read_intervals", "%+#10", # Only analyze first 10 frames for speed
input_path
]
logger.debug(f"Running ffprobe command: {' '.join(cmd)}")
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30 # Add timeout
)
if result.returncode == 0:
try:
return json.loads(result.stdout)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse ffprobe output: {e}")
else:
logger.error(f"FFprobe failed: {result.stderr}")
except subprocess.TimeoutExpired:
logger.error("FFprobe timed out")
except Exception as e:
logger.error(f"Error probing video: {str(e)}")
return {}
def _parse_frame_rate(self, rate_str: str) -> float:
"""Parse frame rate string to float"""
try:
if "/" in rate_str:
num, den = map(float, rate_str.split("/"))
return num / den if den != 0 else 0
return float(rate_str)
except (ValueError, ZeroDivisionError):
return 30.0 # Default to 30fps
def _detect_high_motion(self, video_info: Dict) -> bool:
"""Detect high motion content based on frame rate and codec parameters"""
try:
# Check frame rate variation
if video_info.get("avg_frame_rate") and video_info.get("r_frame_rate"):
avg_fps = self._parse_frame_rate(video_info["avg_frame_rate"])
fps = self._parse_frame_rate(video_info["r_frame_rate"])
if abs(avg_fps - fps) > 5: # Significant frame rate variation
return True
# Check codec parameters for motion indicators
if "codec_tag_string" in video_info:
high_motion_codecs = ["avc1", "h264", "hevc"]
if any(codec in video_info["codec_tag_string"].lower() for codec in high_motion_codecs):
return True
except Exception as e:
logger.warning(f"Frame rate analysis failed: {str(e)}")
return False
def _analyze_dark_scenes(self, input_path: str) -> bool:
"""Analyze video for dark scenes using FFmpeg signalstats filter"""
try:
with temp_path_context() as temp_dir:
sample_cmd = [
str(self.ffmpeg_path),
"-i", input_path,
"-vf", "select='eq(pict_type,I)',signalstats",
"-show_entries", "frame_tags=lavfi.signalstats.YAVG",
"-f", "null",
"-t", "30", # Only analyze first 30 seconds
"-"
]
logger.debug(f"Running dark scene analysis: {' '.join(sample_cmd)}")
result = subprocess.run(
sample_cmd,
capture_output=True,
text=True,
timeout=60 # Add timeout
)
dark_frames = 0
total_frames = 0
for line in result.stderr.split("\n"):
if "YAVG" in line:
try:
avg_brightness = float(line.split("=")[1])
if avg_brightness < 40: # Dark scene threshold
dark_frames += 1
total_frames += 1
except (ValueError, IndexError):
continue
return total_frames > 0 and (dark_frames / total_frames) > 0.2
except subprocess.TimeoutExpired:
logger.warning("Dark scene analysis timed out")
except Exception as e:
logger.warning(f"Dark scene analysis failed: {str(e)}")
return False
def _detect_complex_scenes(self, video_info: Dict) -> bool:
"""Detect complex scenes based on codec parameters and bitrate"""
try:
# Check for high profile/level
profile = video_info.get("profile", "").lower()
level = video_info.get("level", -1)
if "high" in profile or level >= 41: # Level 4.1 or higher
return True
# Check for high bitrate
if "bit_rate" in video_info:
bitrate = int(video_info["bit_rate"])
if bitrate > 4000000: # Higher than 4Mbps
return True
except Exception as e:
logger.warning(f"Complex scene detection failed: {str(e)}")
return False
def _get_audio_properties(self, audio_info: Optional[Dict]) -> Dict[str, Any]:
"""Extract audio properties from stream info"""
if not audio_info:
return {
"audio_bitrate": 128000, # Default to 128kbps
"audio_channels": 2,
"audio_sample_rate": 48000
}
try:
return {
"audio_bitrate": int(audio_info.get("bit_rate", 128000)),
"audio_channels": int(audio_info.get("channels", 2)),
"audio_sample_rate": int(audio_info.get("sample_rate", 48000))
}
except (ValueError, TypeError) as e:
logger.warning(f"Error parsing audio properties: {e}")
return {
"audio_bitrate": 128000,
"audio_channels": 2,
"audio_sample_rate": 48000
}