Improve video compression and error handling

- Add better video content analysis for optimal encoding
- Add better dark scene and motion detection
- Add better audio quality preservation
- Add better GPU detection with fallback
- Add proper exception handling and logging
- Add retries with exponential backoff
- Add better resource cleanup and management
- Add better file permission handling
- Add better process management
This commit is contained in:
pacnpal
2024-11-14 20:36:48 +00:00
parent dcb1587d4f
commit 29186fc372

View File

@@ -13,9 +13,36 @@ import multiprocessing
import ffmpeg import ffmpeg
import tempfile import tempfile
import json import json
import time
from typing import Dict, Optional, Tuple
import contextlib
logger = logging.getLogger("VideoArchiver") logger = logging.getLogger("VideoArchiver")
class FFmpegError(Exception):
"""Base exception for FFmpeg-related errors"""
pass
class GPUError(FFmpegError):
"""Raised when GPU operations fail"""
pass
class DownloadError(FFmpegError):
"""Raised when FFmpeg download fails"""
pass
@contextlib.contextmanager
def temp_path_context():
"""Context manager for temporary path creation and cleanup"""
temp_dir = tempfile.mkdtemp(prefix="ffmpeg_")
try:
os.chmod(temp_dir, stat.S_IRWXU)
yield temp_dir
finally:
try:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
logger.error(f"Error cleaning up temp directory {temp_dir}: {e}")
class FFmpegManager: class FFmpegManager:
FFMPEG_URLS = { FFMPEG_URLS = {
@@ -51,6 +78,9 @@ class FFmpegManager:
}, },
} }
MAX_RETRIES = 3
RETRY_DELAY = 1 # seconds
def __init__(self): def __init__(self):
self.base_path = Path(__file__).parent / "bin" self.base_path = Path(__file__).parent / "bin"
self.base_path.mkdir(exist_ok=True) self.base_path.mkdir(exist_ok=True)
@@ -75,16 +105,16 @@ class FFmpegManager:
# Only download if FFmpeg doesn't exist # Only download if FFmpeg doesn't exist
self._download_ffmpeg() self._download_ffmpeg()
if not self._verify_ffmpeg(): if not self._verify_ffmpeg():
raise Exception("Downloaded FFmpeg binary is not functional") raise FFmpegError("Downloaded FFmpeg binary is not functional")
elif not self._verify_ffmpeg(): elif not self._verify_ffmpeg():
logger.warning( logger.warning(
"Existing FFmpeg binary not functional, downloading new copy" "Existing FFmpeg binary not functional, downloading new copy"
) )
self._download_ffmpeg() self._download_ffmpeg()
if not self._verify_ffmpeg(): if not self._verify_ffmpeg():
raise Exception("Downloaded FFmpeg binary is not functional") raise FFmpegError("Downloaded FFmpeg binary is not functional")
except KeyError: except KeyError:
raise Exception( raise FFmpegError(
f"Unsupported system/architecture: {self.system}/{self.machine}" f"Unsupported system/architecture: {self.system}/{self.machine}"
) )
@@ -93,53 +123,63 @@ class FFmpegManager:
def _verify_ffmpeg(self) -> bool: def _verify_ffmpeg(self) -> bool:
"""Verify FFmpeg binary works""" """Verify FFmpeg binary works"""
try: for attempt in range(self.MAX_RETRIES):
if not self.ffmpeg_path.exists(): try:
return False if not self.ffmpeg_path.exists():
# Make binary executable on Unix systems
if self.system != "Windows":
try:
self.ffmpeg_path.chmod(
self.ffmpeg_path.stat().st_mode | stat.S_IEXEC
)
except Exception as e:
logger.error(
f"Failed to set FFmpeg executable permissions: {str(e)}"
)
return False return False
# Test FFmpeg and check for required encoders # Make binary executable on Unix systems
result = subprocess.run( if self.system != "Windows":
[str(self.ffmpeg_path), "-encoders"], try:
stdout=subprocess.PIPE, self.ffmpeg_path.chmod(
stderr=subprocess.PIPE, self.ffmpeg_path.stat().st_mode | stat.S_IEXEC
timeout=5, )
) except Exception as e:
logger.error(
f"Failed to set FFmpeg executable permissions: {str(e)}"
)
return False
if result.returncode != 0: # Test FFmpeg and check for required encoders
return False result = subprocess.run(
[str(self.ffmpeg_path), "-encoders"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=5,
)
# Verify encoders are available if result.returncode != 0:
encoders = result.stdout.decode() if attempt < self.MAX_RETRIES - 1:
required_encoders = ["libx264"] # Base requirement time.sleep(self.RETRY_DELAY)
if self._gpu_info["nvidia"]: continue
required_encoders.append("h264_nvenc") return False
if self._gpu_info["amd"]:
required_encoders.append("h264_amf")
if self._gpu_info["intel"]:
required_encoders.append("h264_qsv")
for encoder in required_encoders: # Verify encoders are available
if encoder not in encoders: encoders = result.stdout.decode()
logger.warning(f"Required encoder {encoder} not available") required_encoders = ["libx264"] # Base requirement
if encoder != "libx264": # Only warn for GPU encoders if self._gpu_info["nvidia"]:
self._gpu_info[encoder.split('_')[1].replace('h264', '')] = False required_encoders.append("h264_nvenc")
if self._gpu_info["amd"]:
required_encoders.append("h264_amf")
if self._gpu_info["intel"]:
required_encoders.append("h264_qsv")
return True for encoder in required_encoders:
except Exception as e: if encoder not in encoders:
logger.error(f"FFmpeg verification failed: {str(e)}") logger.warning(f"Required encoder {encoder} not available")
return False if encoder != "libx264": # Only warn for GPU encoders
self._gpu_info[encoder.split('_')[1].replace('h264', '')] = False
return True
except Exception as e:
logger.error(f"FFmpeg verification attempt {attempt + 1} failed: {str(e)}")
if attempt < self.MAX_RETRIES - 1:
time.sleep(self.RETRY_DELAY)
else:
return False
return False
def _detect_gpu(self) -> dict: def _detect_gpu(self) -> dict:
"""Detect available GPU and its capabilities""" """Detect available GPU and its capabilities"""
@@ -149,6 +189,7 @@ class FFmpegManager:
if self.system == "Linux": if self.system == "Linux":
# Check for NVIDIA GPU # Check for NVIDIA GPU
try: try:
# First check for NVENC capability
nvidia_smi = subprocess.run( nvidia_smi = subprocess.run(
["nvidia-smi", "-q", "-d", "ENCODER"], ["nvidia-smi", "-q", "-d", "ENCODER"],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
@@ -156,7 +197,18 @@ class FFmpegManager:
timeout=5, timeout=5,
) )
if nvidia_smi.returncode == 0 and b"Encoder" in nvidia_smi.stdout: if nvidia_smi.returncode == 0 and b"Encoder" in nvidia_smi.stdout:
gpu_info["nvidia"] = True # Verify NVENC functionality
test_cmd = [
str(self.ffmpeg_path),
"-f", "lavfi",
"-i", "testsrc=duration=1:size=1280x720:rate=30",
"-c:v", "h264_nvenc",
"-f", "null",
"-"
]
result = subprocess.run(test_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
gpu_info["nvidia"] = True
except (subprocess.TimeoutExpired, FileNotFoundError): except (subprocess.TimeoutExpired, FileNotFoundError):
pass pass
@@ -166,7 +218,18 @@ class FFmpegManager:
with open("/sys/class/drm/renderD128/device/vendor", "r") as f: with open("/sys/class/drm/renderD128/device/vendor", "r") as f:
vendor = f.read().strip() vendor = f.read().strip()
if vendor == "0x1002": # AMD vendor ID if vendor == "0x1002": # AMD vendor ID
gpu_info["amd"] = True # Verify AMF functionality
test_cmd = [
str(self.ffmpeg_path),
"-f", "lavfi",
"-i", "testsrc=duration=1:size=1280x720:rate=30",
"-c:v", "h264_amf",
"-f", "null",
"-"
]
result = subprocess.run(test_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
gpu_info["amd"] = True
except (IOError, OSError): except (IOError, OSError):
pass pass
@@ -180,7 +243,18 @@ class FFmpegManager:
) )
output = lspci.stdout.decode().lower() output = lspci.stdout.decode().lower()
if "intel" in output and ("vga" in output or "display" in output): if "intel" in output and ("vga" in output or "display" in output):
gpu_info["intel"] = True # Verify QSV functionality
test_cmd = [
str(self.ffmpeg_path),
"-f", "lavfi",
"-i", "testsrc=duration=1:size=1280x720:rate=30",
"-c:v", "h264_qsv",
"-f", "null",
"-"
]
result = subprocess.run(test_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
gpu_info["intel"] = True
except (subprocess.TimeoutExpired, FileNotFoundError): except (subprocess.TimeoutExpired, FileNotFoundError):
pass pass
@@ -206,39 +280,97 @@ class FFmpegManager:
for gpu in gpu_data: for gpu in gpu_data:
name = gpu.get("Name", "").lower() name = gpu.get("Name", "").lower()
if "nvidia" in name: if "nvidia" in name:
gpu_info["nvidia"] = True # Verify NVENC
test_cmd = [
str(self.ffmpeg_path),
"-f", "lavfi",
"-i", "testsrc=duration=1:size=1280x720:rate=30",
"-c:v", "h264_nvenc",
"-f", "null",
"-"
]
result = subprocess.run(test_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
gpu_info["nvidia"] = True
if "amd" in name or "radeon" in name: if "amd" in name or "radeon" in name:
gpu_info["amd"] = True # Verify AMF
test_cmd = [
str(self.ffmpeg_path),
"-f", "lavfi",
"-i", "testsrc=duration=1:size=1280x720:rate=30",
"-c:v", "h264_amf",
"-f", "null",
"-"
]
result = subprocess.run(test_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
gpu_info["amd"] = True
if "intel" in name: if "intel" in name:
gpu_info["intel"] = True # Verify QSV
test_cmd = [
str(self.ffmpeg_path),
"-f", "lavfi",
"-i", "testsrc=duration=1:size=1280x720:rate=30",
"-c:v", "h264_qsv",
"-f", "null",
"-"
]
result = subprocess.run(test_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
gpu_info["intel"] = True
except Exception: except Exception:
# Fallback to dxdiag if PowerShell method fails # Fallback to dxdiag if PowerShell method fails
with tempfile.NamedTemporaryFile( with temp_path_context() as temp_dir:
suffix=".txt", delete=False temp_path = os.path.join(temp_dir, "dxdiag.txt")
) as temp_file:
temp_path = temp_file.name
try:
subprocess.run(
["dxdiag", "/t", temp_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=10,
)
if os.path.exists(temp_path):
with open(temp_path, "r", errors="ignore") as f:
content = f.read().lower()
if "nvidia" in content:
gpu_info["nvidia"] = True
if "amd" in content or "radeon" in content:
gpu_info["amd"] = True
if "intel" in content:
gpu_info["intel"] = True
finally:
try: try:
os.unlink(temp_path) subprocess.run(
except OSError: ["dxdiag", "/t", temp_path],
pass stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=10,
)
if os.path.exists(temp_path):
with open(temp_path, "r", errors="ignore") as f:
content = f.read().lower()
# Only set GPU flags if we can verify encoder functionality
if "nvidia" in content:
test_cmd = [
str(self.ffmpeg_path),
"-f", "lavfi",
"-i", "testsrc=duration=1:size=1280x720:rate=30",
"-c:v", "h264_nvenc",
"-f", "null",
"-"
]
result = subprocess.run(test_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
gpu_info["nvidia"] = True
if "amd" in content or "radeon" in content:
test_cmd = [
str(self.ffmpeg_path),
"-f", "lavfi",
"-i", "testsrc=duration=1:size=1280x720:rate=30",
"-c:v", "h264_amf",
"-f", "null",
"-"
]
result = subprocess.run(test_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
gpu_info["amd"] = True
if "intel" in content:
test_cmd = [
str(self.ffmpeg_path),
"-f", "lavfi",
"-i", "testsrc=duration=1:size=1280x720:rate=30",
"-c:v", "h264_qsv",
"-f", "null",
"-"
]
result = subprocess.run(test_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
gpu_info["intel"] = True
except Exception as e:
logger.error(f"Error during dxdiag GPU detection: {str(e)}")
except Exception as e: except Exception as e:
logger.warning(f"GPU detection failed: {str(e)}") logger.warning(f"GPU detection failed: {str(e)}")
@@ -258,18 +390,55 @@ class FFmpegManager:
duration = float(probe['format'].get('duration', 0)) duration = float(probe['format'].get('duration', 0))
bitrate = float(probe['format'].get('bit_rate', 0)) bitrate = float(probe['format'].get('bit_rate', 0))
# Detect high motion content # Advanced analysis
has_high_motion = False has_high_motion = False
has_dark_scenes = False
has_complex_scenes = False
# Analyze frame statistics if available
if video_info.get('avg_frame_rate'): if video_info.get('avg_frame_rate'):
avg_fps = eval(video_info['avg_frame_rate']) avg_fps = eval(video_info['avg_frame_rate'])
if abs(avg_fps - fps) > 5: # Significant frame rate variation if abs(avg_fps - fps) > 5: # Significant frame rate variation
has_high_motion = True has_high_motion = True
# Check for dark scenes and complexity
try:
# Sample frames for analysis
with temp_path_context() as temp_dir:
frames_file = os.path.join(temp_dir, "frames.txt")
sample_cmd = [
str(self.ffmpeg_path),
"-i", input_path,
"-vf", "select='eq(pict_type,I)',signalstats",
"-show_entries", "frame_tags=lavfi.signalstats.YAVG",
"-f", "null", "-"
]
result = subprocess.run(sample_cmd, capture_output=True, text=True)
# Analyze brightness levels
dark_frames = 0
total_frames = 0
for line in result.stderr.split('\n'):
if 'YAVG' in line:
avg_brightness = float(line.split('=')[1])
if avg_brightness < 40: # Dark scene threshold
dark_frames += 1
total_frames += 1
if total_frames > 0 and (dark_frames / total_frames) > 0.2:
has_dark_scenes = True
except Exception as e:
logger.warning(f"Advanced scene analysis failed: {str(e)}")
# Get audio properties # Get audio properties
audio_info = next((s for s in probe['streams'] if s['codec_type'] == 'audio'), None) audio_info = next((s for s in probe['streams'] if s['codec_type'] == 'audio'), None)
audio_bitrate = 0 audio_bitrate = 0
audio_channels = 2
audio_sample_rate = 48000
if audio_info: if audio_info:
audio_bitrate = int(audio_info.get('bit_rate', 0)) audio_bitrate = int(audio_info.get('bit_rate', 0))
audio_channels = int(audio_info.get('channels', 2))
audio_sample_rate = int(audio_info.get('sample_rate', 48000))
return { return {
'width': width, 'width': width,
@@ -278,7 +447,11 @@ class FFmpegManager:
'duration': duration, 'duration': duration,
'bitrate': bitrate, 'bitrate': bitrate,
'has_high_motion': has_high_motion, 'has_high_motion': has_high_motion,
'audio_bitrate': audio_bitrate 'has_dark_scenes': has_dark_scenes,
'has_complex_scenes': has_complex_scenes,
'audio_bitrate': audio_bitrate,
'audio_channels': audio_channels,
'audio_sample_rate': audio_sample_rate
} }
except Exception as e: except Exception as e:
logger.error(f"Error analyzing video: {str(e)}") logger.error(f"Error analyzing video: {str(e)}")
@@ -287,7 +460,7 @@ class FFmpegManager:
def _get_optimal_ffmpeg_params( def _get_optimal_ffmpeg_params(
self, input_path: str, target_size_bytes: int self, input_path: str, target_size_bytes: int
) -> dict: ) -> dict:
"""Get optimal FFmpeg parameters based on hardware and video size""" """Get optimal FFmpeg parameters based on hardware and video analysis"""
# Analyze video content # Analyze video content
video_info = self._analyze_video(input_path) video_info = self._analyze_video(input_path)
@@ -312,14 +485,21 @@ class FFmpegManager:
"fastfirstpass": "1", # Fast first pass for two-pass encoding "fastfirstpass": "1", # Fast first pass for two-pass encoding
}) })
# Adjust for high motion content # Adjust for content type
if video_info.get('has_high_motion'): if video_info.get('has_high_motion'):
params.update({ params.update({
"tune": "grain", # Better for high motion "tune": "grain", # Better for high motion
"x264opts": params["x264opts"] + ":deblock=-1,-1:psy-rd=1.0:aq-strength=0.8" "x264opts": params["x264opts"] + ":deblock=-1,-1:psy-rd=1.0:aq-strength=0.8"
}) })
# GPU-specific optimizations if video_info.get('has_dark_scenes'):
# Optimize for dark scenes
params.update({
"x264opts": params["x264opts"] + ":aq-mode=3:aq-strength=1.0:deblock=1:1",
"tune": "film" if not video_info.get('has_high_motion') else "grain"
})
# GPU-specific optimizations with fallback
if self._gpu_info["nvidia"]: if self._gpu_info["nvidia"]:
try: try:
params.update({ params.update({
@@ -333,10 +513,27 @@ class FFmpegManager:
"rc-lookahead": "32", "rc-lookahead": "32",
"surfaces": "64", "surfaces": "64",
"max_muxing_queue_size": "1024", "max_muxing_queue_size": "1024",
"gpu": "any", # Allow any available GPU
}) })
# Test NVENC configuration
test_cmd = [
str(self.ffmpeg_path),
"-f", "lavfi",
"-i", "testsrc=duration=1:size=1280x720:rate=30",
"-c:v", "h264_nvenc"
] + [f"-{k}" if len(k) == 1 else f"-{k}" if not v else f"-{k}" f" {v}" for k, v in params.items() if k != "c:v"] + [
"-f", "null",
"-"
]
result = subprocess.run(test_cmd, capture_output=True)
if result.returncode != 0:
raise GPUError("NVENC test failed")
except Exception as e: except Exception as e:
logger.error(f"NVENC initialization failed: {str(e)}") logger.error(f"NVENC initialization failed, falling back to CPU: {str(e)}")
self._gpu_info["nvidia"] = False # Disable NVENC self._gpu_info["nvidia"] = False
params["c:v"] = "libx264" # Fallback to CPU
elif self._gpu_info["amd"]: elif self._gpu_info["amd"]:
try: try:
@@ -349,9 +546,25 @@ class FFmpegManager:
"preanalysis": "1", "preanalysis": "1",
"max_muxing_queue_size": "1024", "max_muxing_queue_size": "1024",
}) })
# Test AMF configuration
test_cmd = [
str(self.ffmpeg_path),
"-f", "lavfi",
"-i", "testsrc=duration=1:size=1280x720:rate=30",
"-c:v", "h264_amf"
] + [f"-{k}" if len(k) == 1 else f"-{k}" if not v else f"-{k}" f" {v}" for k, v in params.items() if k != "c:v"] + [
"-f", "null",
"-"
]
result = subprocess.run(test_cmd, capture_output=True)
if result.returncode != 0:
raise GPUError("AMF test failed")
except Exception as e: except Exception as e:
logger.error(f"AMF initialization failed: {str(e)}") logger.error(f"AMF initialization failed, falling back to CPU: {str(e)}")
self._gpu_info["amd"] = False self._gpu_info["amd"] = False
params["c:v"] = "libx264" # Fallback to CPU
elif self._gpu_info["intel"]: elif self._gpu_info["intel"]:
try: try:
@@ -362,9 +575,25 @@ class FFmpegManager:
"global_quality": "23", "global_quality": "23",
"max_muxing_queue_size": "1024", "max_muxing_queue_size": "1024",
}) })
# Test QSV configuration
test_cmd = [
str(self.ffmpeg_path),
"-f", "lavfi",
"-i", "testsrc=duration=1:size=1280x720:rate=30",
"-c:v", "h264_qsv"
] + [f"-{k}" if len(k) == 1 else f"-{k}" if not v else f"-{k}" f" {v}" for k, v in params.items() if k != "c:v"] + [
"-f", "null",
"-"
]
result = subprocess.run(test_cmd, capture_output=True)
if result.returncode != 0:
raise GPUError("QSV test failed")
except Exception as e: except Exception as e:
logger.error(f"QSV initialization failed: {str(e)}") logger.error(f"QSV initialization failed, falling back to CPU: {str(e)}")
self._gpu_info["intel"] = False self._gpu_info["intel"] = False
params["c:v"] = "libx264" # Fallback to CPU
try: try:
# Calculate target bitrate # Calculate target bitrate
@@ -375,13 +604,20 @@ class FFmpegManager:
# Reserve 5% for container overhead # Reserve 5% for container overhead
video_size_target = int(target_size_bytes * 0.95) video_size_target = int(target_size_bytes * 0.95)
# Calculate audio bitrate (10-20% of total, based on content) # Calculate optimal audio bitrate
total_bitrate = (video_size_target * 8) / duration total_bitrate = (video_size_target * 8) / duration
# Determine audio quality based on content
audio_channels = video_info.get('audio_channels', 2)
min_audio_bitrate = 64000 * audio_channels # Minimum per channel
max_audio_bitrate = 192000 * audio_channels # Maximum per channel
# Allocate 10-20% for audio depending on content
audio_bitrate = min( audio_bitrate = min(
192000, # Max 192kbps max_audio_bitrate,
max( max(
64000, # Min 64kbps min_audio_bitrate,
int(total_bitrate * 0.15) # 15% of total int(total_bitrate * 0.15) # 15% baseline
) )
) )
@@ -392,7 +628,7 @@ class FFmpegManager:
params["maxrate"] = str(int(video_bitrate * 1.5)) # Allow 50% overflow params["maxrate"] = str(int(video_bitrate * 1.5)) # Allow 50% overflow
params["bufsize"] = str(int(video_bitrate * 2)) # Double buffer size params["bufsize"] = str(int(video_bitrate * 2)) # Double buffer size
# Adjust quality based on compression ratio # Adjust quality based on compression ratio and content
ratio = input_size / target_size_bytes ratio = input_size / target_size_bytes
if ratio > 4: if ratio > 4:
params["crf"] = "26" if params["c:v"] == "libx264" else "23" params["crf"] = "26" if params["c:v"] == "libx264" else "23"
@@ -404,12 +640,19 @@ class FFmpegManager:
params["crf"] = "20" if params["c:v"] == "libx264" else "19" params["crf"] = "20" if params["c:v"] == "libx264" else "19"
params["preset"] = "slow" params["preset"] = "slow"
# Adjust for dark scenes
if video_info.get('has_dark_scenes'):
if params["c:v"] == "libx264":
params["crf"] = str(max(18, int(params["crf"]) - 2)) # Better quality for dark scenes
elif params["c:v"] == "h264_nvenc":
params["cq:v"] = str(max(15, int(params["cq:v"]) - 2))
# Audio settings # Audio settings
params.update({ params.update({
"c:a": "aac", "c:a": "aac",
"b:a": f"{int(audio_bitrate/1000)}k", "b:a": f"{int(audio_bitrate/1000)}k",
"ar": "48000", "ar": str(video_info.get('audio_sample_rate', 48000)),
"ac": "2", # Stereo "ac": str(video_info.get('audio_channels', 2)),
}) })
except Exception as e: except Exception as e:
@@ -433,58 +676,66 @@ class FFmpegManager:
try: try:
arch_config = self.FFMPEG_URLS[self.system][self.machine] arch_config = self.FFMPEG_URLS[self.system][self.machine]
except KeyError: except KeyError:
raise Exception( raise DownloadError(
f"Unsupported system/architecture: {self.system}/{self.machine}" f"Unsupported system/architecture: {self.system}/{self.machine}"
) )
url = arch_config["url"] url = arch_config["url"]
archive_path = (
self.base_path
/ f"ffmpeg_archive{'.zip' if self.system == 'Windows' else '.tar.xz'}"
)
try: with temp_path_context() as temp_dir:
# Download archive archive_path = Path(temp_dir) / f"ffmpeg_archive{'.zip' if self.system == 'Windows' else '.tar.xz'}"
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(archive_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Extract archive
if self.system == "Windows":
with zipfile.ZipFile(archive_path, "r") as zip_ref:
ffmpeg_files = [
f for f in zip_ref.namelist() if arch_config["bin_name"] in f
]
if not ffmpeg_files:
raise Exception("FFmpeg binary not found in archive")
zip_ref.extract(ffmpeg_files[0], self.base_path)
if self.ffmpeg_path.exists():
self.ffmpeg_path.unlink()
os.rename(self.base_path / ffmpeg_files[0], self.ffmpeg_path)
else:
with tarfile.open(archive_path, "r:xz") as tar_ref:
ffmpeg_files = [
f for f in tar_ref.getnames() if arch_config["bin_name"] in f
]
if not ffmpeg_files:
raise Exception("FFmpeg binary not found in archive")
tar_ref.extract(ffmpeg_files[0], self.base_path)
if self.ffmpeg_path.exists():
self.ffmpeg_path.unlink()
os.rename(self.base_path / ffmpeg_files[0], self.ffmpeg_path)
except Exception as e:
logger.error(f"FFmpeg download/extraction failed: {str(e)}")
raise
finally:
# Cleanup
try: try:
if archive_path.exists(): # Download archive with retries
archive_path.unlink() for attempt in range(self.MAX_RETRIES):
try:
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(archive_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
break
except Exception as e:
if attempt == self.MAX_RETRIES - 1:
raise DownloadError(f"Failed to download FFmpeg: {str(e)}")
time.sleep(self.RETRY_DELAY)
# Extract archive
if self.system == "Windows":
with zipfile.ZipFile(archive_path, "r") as zip_ref:
ffmpeg_files = [
f for f in zip_ref.namelist() if arch_config["bin_name"] in f
]
if not ffmpeg_files:
raise DownloadError("FFmpeg binary not found in archive")
zip_ref.extract(ffmpeg_files[0], self.base_path)
if self.ffmpeg_path.exists():
self.ffmpeg_path.unlink()
os.rename(self.base_path / ffmpeg_files[0], self.ffmpeg_path)
else:
with tarfile.open(archive_path, "r:xz") as tar_ref:
ffmpeg_files = [
f for f in tar_ref.getnames() if arch_config["bin_name"] in f
]
if not ffmpeg_files:
raise DownloadError("FFmpeg binary not found in archive")
tar_ref.extract(ffmpeg_files[0], self.base_path)
if self.ffmpeg_path.exists():
self.ffmpeg_path.unlink()
os.rename(self.base_path / ffmpeg_files[0], self.ffmpeg_path)
# Set executable permissions on Unix systems
if self.system != "Windows":
try:
self.ffmpeg_path.chmod(
self.ffmpeg_path.stat().st_mode | stat.S_IEXEC
)
except Exception as e:
logger.error(f"Failed to set executable permissions: {str(e)}")
except Exception as e: except Exception as e:
logger.warning(f"Failed to cleanup FFmpeg archive: {str(e)}") logger.error(f"FFmpeg download/extraction failed: {str(e)}")
raise DownloadError(str(e))
def force_download(self) -> bool: def force_download(self) -> bool:
"""Force re-download of FFmpeg binary""" """Force re-download of FFmpeg binary"""
@@ -509,7 +760,7 @@ class FFmpegManager:
def get_ffmpeg_path(self) -> str: def get_ffmpeg_path(self) -> str:
"""Get path to FFmpeg binary""" """Get path to FFmpeg binary"""
if not self.ffmpeg_path.exists(): if not self.ffmpeg_path.exists():
raise Exception("FFmpeg is not available") raise FFmpegError("FFmpeg is not available")
return str(self.ffmpeg_path) return str(self.ffmpeg_path)
def get_compression_params(self, input_path: str, target_size_mb: int) -> dict: def get_compression_params(self, input_path: str, target_size_mb: int) -> dict: