diff --git a/videoarchiver/ffmpeg/__init__.py b/videoarchiver/ffmpeg/__init__.py new file mode 100644 index 0000000..b42f084 --- /dev/null +++ b/videoarchiver/ffmpeg/__init__.py @@ -0,0 +1,6 @@ +"""FFmpeg management package""" + +from .exceptions import FFmpegError, GPUError, DownloadError +from .ffmpeg_manager import FFmpegManager + +__all__ = ['FFmpegManager', 'FFmpegError', 'GPUError', 'DownloadError'] diff --git a/videoarchiver/ffmpeg/encoder_params.py b/videoarchiver/ffmpeg/encoder_params.py new file mode 100644 index 0000000..643b852 --- /dev/null +++ b/videoarchiver/ffmpeg/encoder_params.py @@ -0,0 +1,161 @@ +"""FFmpeg encoding parameters generator""" + +import os +import logging +from typing import Dict, Any + +logger = logging.getLogger("VideoArchiver") + +class EncoderParams: + def __init__(self, cpu_cores: int, gpu_info: Dict[str, bool]): + self.cpu_cores = cpu_cores + self.gpu_info = gpu_info + + def get_params(self, video_info: Dict[str, Any], target_size_bytes: int) -> Dict[str, str]: + """Get optimal FFmpeg parameters based on hardware and video analysis""" + params = self._get_base_params() + params.update(self._get_content_specific_params(video_info)) + params.update(self._get_gpu_specific_params()) + params.update(self._get_bitrate_params(video_info, target_size_bytes)) + return params + + def _get_base_params(self) -> Dict[str, str]: + """Get base encoding parameters""" + return { + "c:v": "libx264", # Default to CPU encoding + "threads": str(self.cpu_cores), + "preset": "medium", + "crf": "23", + "movflags": "+faststart", + "profile:v": "high", + "level": "4.1", + "pix_fmt": "yuv420p", + "x264opts": "rc-lookahead=60:me=umh:subme=7:ref=4:b-adapt=2:direct=auto", + "tune": "film", + "fastfirstpass": "1" + } + + def _get_content_specific_params(self, video_info: Dict[str, Any]) -> Dict[str, str]: + """Get parameters optimized for specific content types""" + params = {} + + if video_info.get("has_high_motion"): + params.update({ + "tune": "grain", + "x264opts": "rc-lookahead=60:me=umh:subme=7:ref=4:b-adapt=2:direct=auto:deblock=-1,-1:psy-rd=1.0:aq-strength=0.8" + }) + + if video_info.get("has_dark_scenes"): + x264opts = params.get("x264opts", "rc-lookahead=60:me=umh:subme=7:ref=4:b-adapt=2:direct=auto") + params.update({ + "x264opts": x264opts + ":aq-mode=3:aq-strength=1.0:deblock=1:1", + "tune": "film" if not video_info.get("has_high_motion") else "grain" + }) + + return params + + def _get_gpu_specific_params(self) -> Dict[str, str]: + """Get GPU-specific encoding parameters""" + if self.gpu_info["nvidia"]: + return { + "c:v": "h264_nvenc", + "preset": "p7", + "rc:v": "vbr", + "cq:v": "19", + "b_ref_mode": "middle", + "spatial-aq": "1", + "temporal-aq": "1", + "rc-lookahead": "32", + "surfaces": "64", + "max_muxing_queue_size": "1024", + "gpu": "any" + } + elif self.gpu_info["amd"]: + return { + "c:v": "h264_amf", + "quality": "quality", + "rc": "vbr_peak", + "enforce_hrd": "1", + "vbaq": "1", + "preanalysis": "1", + "max_muxing_queue_size": "1024" + } + elif self.gpu_info["intel"]: + return { + "c:v": "h264_qsv", + "preset": "veryslow", + "look_ahead": "1", + "global_quality": "23", + "max_muxing_queue_size": "1024" + } + return {} + + def _get_bitrate_params(self, video_info: Dict[str, Any], target_size_bytes: int) -> Dict[str, str]: + """Calculate and get bitrate-related parameters""" + params = {} + try: + duration = video_info.get("duration", 0) + input_size = video_info.get("bitrate", 0) * duration / 8 # Estimate from bitrate if size not available + + if duration > 0 and input_size > target_size_bytes: + video_size_target = int(target_size_bytes * 0.95) + total_bitrate = (video_size_target * 8) / duration + + # Audio bitrate calculation + audio_channels = video_info.get("audio_channels", 2) + min_audio_bitrate = 64000 * audio_channels + max_audio_bitrate = 192000 * audio_channels + audio_bitrate = min( + max_audio_bitrate, + max(min_audio_bitrate, int(total_bitrate * 0.15)) + ) + + # Video bitrate calculation + video_bitrate = int((video_size_target * 8) / duration - audio_bitrate) + + # Set bitrate constraints + params["maxrate"] = str(int(video_bitrate * 1.5)) + params["bufsize"] = str(int(video_bitrate * 2)) + + # Quality adjustments based on compression ratio + ratio = input_size / target_size_bytes + if ratio > 4: + params["crf"] = "26" if params.get("c:v", "libx264") == "libx264" else "23" + params["preset"] = "faster" + elif ratio > 2: + params["crf"] = "23" if params.get("c:v", "libx264") == "libx264" else "21" + params["preset"] = "medium" + else: + params["crf"] = "20" if params.get("c:v", "libx264") == "libx264" else "19" + params["preset"] = "slow" + + # Dark scene adjustments + if video_info.get("has_dark_scenes"): + if params.get("c:v", "libx264") == "libx264": + params["crf"] = str(max(18, int(params["crf"]) - 2)) + elif params.get("c:v") == "h264_nvenc": + params["cq:v"] = str(max(15, int(params.get("cq:v", "19")) - 2)) + + # Audio settings + params.update({ + "c:a": "aac", + "b:a": f"{int(audio_bitrate/1000)}k", + "ar": str(video_info.get("audio_sample_rate", 48000)), + "ac": str(video_info.get("audio_channels", 2)) + }) + + except Exception as e: + logger.error(f"Error calculating bitrates: {str(e)}") + # Use safe default parameters + params.update({ + "crf": "23", + "preset": "medium", + "maxrate": f"{2 * 1024 * 1024}", # 2 Mbps + "bufsize": f"{4 * 1024 * 1024}", # 4 Mbps buffer + "c:a": "aac", + "b:a": "128k", + "ar": "48000", + "ac": "2" + }) + + return params diff --git a/videoarchiver/ffmpeg/exceptions.py b/videoarchiver/ffmpeg/exceptions.py new file mode 100644 index 0000000..ea768bb --- /dev/null +++ b/videoarchiver/ffmpeg/exceptions.py @@ -0,0 +1,13 @@ +"""FFmpeg-related exceptions""" + +class FFmpegError(Exception): + """Base exception for FFmpeg-related errors""" + pass + +class GPUError(FFmpegError): + """Raised when GPU operations fail""" + pass + +class DownloadError(FFmpegError): + """Raised when FFmpeg download fails""" + pass diff --git a/videoarchiver/ffmpeg/ffmpeg_downloader.py b/videoarchiver/ffmpeg/ffmpeg_downloader.py new file mode 100644 index 0000000..f2658ef --- /dev/null +++ b/videoarchiver/ffmpeg/ffmpeg_downloader.py @@ -0,0 +1,180 @@ +"""FFmpeg binary downloader and manager""" + +import os +import logging +import shutil +import requests +import tarfile +import zipfile +import subprocess +import tempfile +from pathlib import Path +from contextlib import contextmanager +from .exceptions import DownloadError + +logger = logging.getLogger("VideoArchiver") + +@contextmanager +def temp_path_context(): + """Context manager for temporary path creation and cleanup""" + temp_dir = tempfile.mkdtemp(prefix="ffmpeg_") + try: + os.chmod(temp_dir, 0o777) + yield temp_dir + finally: + try: + shutil.rmtree(temp_dir, ignore_errors=True) + except Exception as e: + logger.error(f"Error cleaning up temp directory {temp_dir}: {e}") + +class FFmpegDownloader: + FFMPEG_URLS = { + "Windows": { + "x86_64": { + "url": "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip", + "bin_name": "ffmpeg.exe", + } + }, + "Linux": { + "x86_64": { + "url": "https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz", + "bin_name": "ffmpeg", + }, + "aarch64": { + "url": "https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-arm64-static.tar.xz", + "bin_name": "ffmpeg", + }, + "armv7l": { + "url": "https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-armhf-static.tar.xz", + "bin_name": "ffmpeg", + }, + }, + "Darwin": { + "x86_64": { + "url": "https://evermeet.cx/ffmpeg/getrelease/zip", + "bin_name": "ffmpeg", + }, + "arm64": { + "url": "https://evermeet.cx/ffmpeg/getrelease/zip", + "bin_name": "ffmpeg", + }, + }, + } + + def __init__(self, system: str, machine: str, base_dir: Path): + self.system = system + self.machine = machine.lower() + if self.machine == "arm64": + self.machine = "aarch64" # Normalize ARM64 naming + self.base_dir = base_dir + self.ffmpeg_path = self.base_dir / self._get_binary_name() + + def _get_binary_name(self) -> str: + """Get the appropriate binary name for the current system""" + try: + return self.FFMPEG_URLS[self.system][self.machine]["bin_name"] + except KeyError: + raise DownloadError(f"Unsupported system/architecture: {self.system}/{self.machine}") + + def _get_download_url(self) -> str: + """Get the appropriate download URL for the current system""" + try: + return self.FFMPEG_URLS[self.system][self.machine]["url"] + except KeyError: + raise DownloadError(f"Unsupported system/architecture: {self.system}/{self.machine}") + + def download(self) -> Path: + """Download and set up FFmpeg binary""" + try: + # Ensure base directory exists with proper permissions + self.base_dir.mkdir(parents=True, exist_ok=True) + os.chmod(str(self.base_dir), 0o777) + + with temp_path_context() as temp_dir: + # Download archive + archive_path = self._download_archive(temp_dir) + + # Extract FFmpeg binary + self._extract_binary(archive_path, temp_dir) + + # Set proper permissions + os.chmod(str(self.ffmpeg_path), 0o777) + + return self.ffmpeg_path + + except Exception as e: + logger.error(f"Failed to download FFmpeg: {str(e)}") + raise DownloadError(str(e)) + + def _download_archive(self, temp_dir: str) -> Path: + """Download FFmpeg archive""" + url = self._get_download_url() + archive_path = Path(temp_dir) / f"ffmpeg_archive{'.zip' if self.system == 'Windows' else '.tar.xz'}" + + logger.info(f"Downloading FFmpeg from {url}") + response = requests.get(url, stream=True, timeout=30) + response.raise_for_status() + + with open(archive_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + return archive_path + + def _extract_binary(self, archive_path: Path, temp_dir: str): + """Extract FFmpeg binary from archive""" + logger.info("Extracting FFmpeg binary") + + # Remove existing binary if it exists + if self.ffmpeg_path.exists(): + self.ffmpeg_path.unlink() + + if self.system == "Windows": + self._extract_zip(archive_path, temp_dir) + else: + self._extract_tar(archive_path, temp_dir) + + def _extract_zip(self, archive_path: Path, temp_dir: str): + """Extract from zip archive (Windows)""" + with zipfile.ZipFile(archive_path, "r") as zip_ref: + ffmpeg_files = [f for f in zip_ref.namelist() if self._get_binary_name() in f] + if not ffmpeg_files: + raise DownloadError("FFmpeg binary not found in archive") + + zip_ref.extract(ffmpeg_files[0], temp_dir) + extracted_path = Path(temp_dir) / ffmpeg_files[0] + shutil.copy2(extracted_path, self.ffmpeg_path) + + def _extract_tar(self, archive_path: Path, temp_dir: str): + """Extract from tar archive (Linux/macOS)""" + with tarfile.open(archive_path, "r:xz") as tar_ref: + ffmpeg_files = [f for f in tar_ref.getnames() if f.endswith("/ffmpeg")] + if not ffmpeg_files: + raise DownloadError("FFmpeg binary not found in archive") + + tar_ref.extract(ffmpeg_files[0], temp_dir) + extracted_path = Path(temp_dir) / ffmpeg_files[0] + shutil.copy2(extracted_path, self.ffmpeg_path) + + def verify(self) -> bool: + """Verify FFmpeg binary works""" + try: + if not self.ffmpeg_path.exists(): + return False + + # Ensure proper permissions + os.chmod(str(self.ffmpeg_path), 0o777) + + # Test FFmpeg functionality + result = subprocess.run( + [str(self.ffmpeg_path), "-version"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=5 + ) + + return result.returncode == 0 + + except Exception as e: + logger.error(f"FFmpeg verification failed: {e}") + return False diff --git a/videoarchiver/ffmpeg/ffmpeg_manager.py b/videoarchiver/ffmpeg/ffmpeg_manager.py new file mode 100644 index 0000000..a9efa4e --- /dev/null +++ b/videoarchiver/ffmpeg/ffmpeg_manager.py @@ -0,0 +1,96 @@ +"""Main FFmpeg management module""" + +import os +import platform +import multiprocessing +import logging +from pathlib import Path +from typing import Dict, Any, Optional + +from .exceptions import FFmpegError +from .gpu_detector import GPUDetector +from .video_analyzer import VideoAnalyzer +from .encoder_params import EncoderParams +from .ffmpeg_downloader import FFmpegDownloader + +logger = logging.getLogger("VideoArchiver") + +class FFmpegManager: + def __init__(self): + """Initialize FFmpeg manager""" + # Set up base directory in /tmp for Docker compatibility + self.base_dir = Path("/tmp/ffmpeg") + + # Initialize downloader + self.downloader = FFmpegDownloader( + system=platform.system(), + machine=platform.machine(), + base_dir=self.base_dir + ) + + # Get or download FFmpeg + self.ffmpeg_path = self._initialize_ffmpeg() + + # Initialize components + self.gpu_detector = GPUDetector(self.ffmpeg_path) + self.video_analyzer = VideoAnalyzer(self.ffmpeg_path) + self._gpu_info = self.gpu_detector.detect_gpu() + self._cpu_cores = multiprocessing.cpu_count() + + # Initialize encoder params + self.encoder_params = EncoderParams(self._cpu_cores, self._gpu_info) + + def _initialize_ffmpeg(self) -> Path: + """Initialize FFmpeg binary""" + # Verify existing FFmpeg if it exists + if self.downloader.ffmpeg_path.exists() and self.downloader.verify(): + logger.info(f"Using existing FFmpeg: {self.downloader.ffmpeg_path}") + return self.downloader.ffmpeg_path + + # Download and verify FFmpeg + logger.info("Downloading FFmpeg...") + try: + ffmpeg_path = self.downloader.download() + if not self.downloader.verify(): + raise FFmpegError("Downloaded FFmpeg binary is not functional") + return ffmpeg_path + except Exception as e: + logger.error(f"Failed to initialize FFmpeg: {e}") + raise FFmpegError(f"Failed to initialize FFmpeg: {e}") + + def analyze_video(self, input_path: str) -> Dict[str, Any]: + """Analyze video content for optimal encoding settings""" + return self.video_analyzer.analyze_video(input_path) + + def get_compression_params(self, input_path: str, target_size_mb: int) -> Dict[str, str]: + """Get optimal compression parameters for the given input file""" + # Analyze video first + video_info = self.analyze_video(input_path) + # Get encoding parameters + return self.encoder_params.get_params(video_info, target_size_mb * 1024 * 1024) + + def get_ffmpeg_path(self) -> str: + """Get path to FFmpeg binary""" + if not self.ffmpeg_path.exists(): + raise FFmpegError("FFmpeg is not available") + return str(self.ffmpeg_path) + + def force_download(self) -> bool: + """Force re-download of FFmpeg binary""" + try: + logger.info("Force downloading FFmpeg...") + self.ffmpeg_path = self.downloader.download() + return self.downloader.verify() + except Exception as e: + logger.error(f"Failed to force download FFmpeg: {e}") + return False + + @property + def gpu_info(self) -> Dict[str, bool]: + """Get GPU information""" + return self._gpu_info.copy() + + @property + def cpu_cores(self) -> int: + """Get number of CPU cores""" + return self._cpu_cores diff --git a/videoarchiver/ffmpeg/gpu_detector.py b/videoarchiver/ffmpeg/gpu_detector.py new file mode 100644 index 0000000..168ad85 --- /dev/null +++ b/videoarchiver/ffmpeg/gpu_detector.py @@ -0,0 +1,130 @@ +"""GPU detection functionality for FFmpeg""" + +import os +import json +import subprocess +import logging +from pathlib import Path +from typing import Dict + +logger = logging.getLogger("VideoArchiver") + +class GPUDetector: + def __init__(self, ffmpeg_path: Path): + self.ffmpeg_path = ffmpeg_path + + def detect_gpu(self) -> Dict[str, bool]: + """Detect available GPU and its capabilities""" + gpu_info = {"nvidia": False, "amd": False, "intel": False, "arm": False} + + try: + if os.name == "posix": # Linux/Unix + gpu_info.update(self._detect_linux_gpu()) + elif os.name == "nt": # Windows + gpu_info.update(self._detect_windows_gpu()) + + except Exception as e: + logger.warning(f"GPU detection failed: {str(e)}") + + return gpu_info + + def _test_encoder(self, encoder: str) -> bool: + """Test if a specific encoder works""" + try: + test_cmd = [ + str(self.ffmpeg_path), + "-f", "lavfi", + "-i", "testsrc=duration=1:size=1280x720:rate=30", + "-c:v", encoder, + "-f", "null", + "-" + ] + result = subprocess.run( + test_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=5 + ) + return result.returncode == 0 + except Exception: + return False + + def _detect_linux_gpu(self) -> Dict[str, bool]: + """Detect GPUs on Linux systems""" + gpu_info = {"nvidia": False, "amd": False, "intel": False, "arm": False} + + # Check for NVIDIA GPU + try: + nvidia_smi = subprocess.run( + ["nvidia-smi", "-q", "-d", "ENCODER"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=5 + ) + if nvidia_smi.returncode == 0 and b"Encoder" in nvidia_smi.stdout: + gpu_info["nvidia"] = self._test_encoder("h264_nvenc") + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + # Check for AMD GPU + try: + if os.path.exists("/dev/dri/renderD128"): + with open("/sys/class/drm/renderD128/device/vendor", "r") as f: + vendor = f.read().strip() + if vendor == "0x1002": # AMD vendor ID + gpu_info["amd"] = self._test_encoder("h264_amf") + except (IOError, OSError): + pass + + # Check for Intel GPU + try: + lspci = subprocess.run( + ["lspci"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=5 + ) + output = lspci.stdout.decode().lower() + if "intel" in output and ("vga" in output or "display" in output): + gpu_info["intel"] = self._test_encoder("h264_qsv") + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + # Check for ARM GPU + if os.uname().machine.startswith(("aarch64", "armv7")): + gpu_info["arm"] = True + + return gpu_info + + def _detect_windows_gpu(self) -> Dict[str, bool]: + """Detect GPUs on Windows systems""" + gpu_info = {"nvidia": False, "amd": False, "intel": False, "arm": False} + + try: + # Use PowerShell to get GPU info + ps_command = "Get-WmiObject Win32_VideoController | ConvertTo-Json" + result = subprocess.run( + ["powershell", "-Command", ps_command], + capture_output=True, + text=True, + timeout=10 + ) + + if result.returncode == 0: + gpu_data = json.loads(result.stdout) + if not isinstance(gpu_data, list): + gpu_data = [gpu_data] + + for gpu in gpu_data: + name = gpu.get("Name", "").lower() + if "nvidia" in name: + gpu_info["nvidia"] = self._test_encoder("h264_nvenc") + if "amd" in name or "radeon" in name: + gpu_info["amd"] = self._test_encoder("h264_amf") + if "intel" in name: + gpu_info["intel"] = self._test_encoder("h264_qsv") + + except Exception as e: + logger.error(f"Error during Windows GPU detection: {str(e)}") + + return gpu_info diff --git a/videoarchiver/ffmpeg/video_analyzer.py b/videoarchiver/ffmpeg/video_analyzer.py new file mode 100644 index 0000000..8454a82 --- /dev/null +++ b/videoarchiver/ffmpeg/video_analyzer.py @@ -0,0 +1,129 @@ +"""Video analysis functionality for FFmpeg""" + +import os +import subprocess +import logging +import ffmpeg +from pathlib import Path +from typing import Dict, Any +from contextlib import contextmanager +import tempfile +import shutil + +logger = logging.getLogger("VideoArchiver") + +@contextmanager +def temp_path_context(): + """Context manager for temporary path creation and cleanup""" + temp_dir = tempfile.mkdtemp(prefix="ffmpeg_") + try: + os.chmod(temp_dir, 0o777) + yield temp_dir + finally: + try: + shutil.rmtree(temp_dir, ignore_errors=True) + except Exception as e: + logger.error(f"Error cleaning up temp directory {temp_dir}: {e}") + +class VideoAnalyzer: + def __init__(self, ffmpeg_path: Path): + self.ffmpeg_path = ffmpeg_path + + def analyze_video(self, input_path: str) -> Dict[str, Any]: + """Analyze video content for optimal encoding settings""" + try: + probe = ffmpeg.probe(input_path) + video_info = next(s for s in probe["streams"] if s["codec_type"] == "video") + + # Get video properties + width = int(video_info.get("width", 0)) + height = int(video_info.get("height", 0)) + fps = eval(video_info.get("r_frame_rate", "30/1")) + duration = float(probe["format"].get("duration", 0)) + bitrate = float(probe["format"].get("bit_rate", 0)) + + # Advanced analysis + has_high_motion = self._detect_high_motion(video_info) + has_dark_scenes = self._analyze_dark_scenes(input_path) + + # Get audio properties + audio_info = next( + (s for s in probe["streams"] if s["codec_type"] == "audio"), + None + ) + audio_props = self._get_audio_properties(audio_info) + + return { + "width": width, + "height": height, + "fps": fps, + "duration": duration, + "bitrate": bitrate, + "has_high_motion": has_high_motion, + "has_dark_scenes": has_dark_scenes, + "has_complex_scenes": False, # Reserved for future use + **audio_props + } + + except Exception as e: + logger.error(f"Error analyzing video: {str(e)}") + return {} + + def _detect_high_motion(self, video_info: Dict) -> bool: + """Detect high motion content based on frame rate analysis""" + try: + if video_info.get("avg_frame_rate"): + avg_fps = eval(video_info["avg_frame_rate"]) + fps = eval(video_info.get("r_frame_rate", "30/1")) + return abs(avg_fps - fps) > 5 # Significant frame rate variation + except Exception as e: + logger.warning(f"Frame rate analysis failed: {str(e)}") + return False + + def _analyze_dark_scenes(self, input_path: str) -> bool: + """Analyze video for dark scenes""" + try: + with temp_path_context() as temp_dir: + sample_cmd = [ + str(self.ffmpeg_path), + "-i", input_path, + "-vf", "select='eq(pict_type,I)',signalstats", + "-show_entries", "frame_tags=lavfi.signalstats.YAVG", + "-f", "null", + "-" + ] + result = subprocess.run( + sample_cmd, + capture_output=True, + text=True + ) + + dark_frames = 0 + total_frames = 0 + for line in result.stderr.split("\n"): + if "YAVG" in line: + avg_brightness = float(line.split("=")[1]) + if avg_brightness < 40: # Dark scene threshold + dark_frames += 1 + total_frames += 1 + + return total_frames > 0 and (dark_frames / total_frames) > 0.2 + + except Exception as e: + logger.warning(f"Dark scene analysis failed: {str(e)}") + return False + + def _get_audio_properties(self, audio_info: Dict) -> Dict[str, Any]: + """Extract audio properties from stream info""" + if not audio_info: + return { + "audio_bitrate": 0, + "audio_channels": 2, + "audio_sample_rate": 48000 + } + + return { + "audio_bitrate": int(audio_info.get("bit_rate", 0)), + "audio_channels": int(audio_info.get("channels", 2)), + "audio_sample_rate": int(audio_info.get("sample_rate", 48000)) + } diff --git a/videoarchiver/ffmpeg_manager.py b/videoarchiver/ffmpeg_manager.py index aae42b3..0d13e5c 100644 --- a/videoarchiver/ffmpeg_manager.py +++ b/videoarchiver/ffmpeg_manager.py @@ -1,933 +1,5 @@ -import os -import sys -import platform -import subprocess -import logging -import shutil -import requests -import zipfile -import tarfile -from pathlib import Path -import stat -import multiprocessing -import ffmpeg -import tempfile -import json -import time -from typing import Dict, Optional, Tuple -import contextlib +"""FFmpeg management module""" -logger = logging.getLogger("VideoArchiver") +from .ffmpeg import FFmpegManager, FFmpegError, GPUError, DownloadError -class FFmpegError(Exception): - """Base exception for FFmpeg-related errors""" - pass - -class GPUError(FFmpegError): - """Raised when GPU operations fail""" - pass - -class DownloadError(FFmpegError): - """Raised when FFmpeg download fails""" - pass - -@contextlib.contextmanager -def temp_path_context(): - """Context manager for temporary path creation and cleanup""" - # Create a temporary directory using system temp directory - temp_dir = tempfile.mkdtemp(prefix="ffmpeg_") - try: - yield temp_dir - finally: - try: - shutil.rmtree(temp_dir, ignore_errors=True) - except Exception as e: - logger.error(f"Error cleaning up temp directory {temp_dir}: {e}") - -class FFmpegManager: - FFMPEG_URLS = { - "Windows": { - "x86_64": { - "url": "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip", - "bin_name": "ffmpeg.exe", - } - }, - "Linux": { - "x86_64": { - "url": "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linux64-gpl.tar.xz", - "bin_name": "ffmpeg", - }, - "aarch64": { # ARM64 - "url": "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linuxarm64-gpl.tar.xz", - "bin_name": "ffmpeg", - }, - "armv7l": { # ARM32 - "url": "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linuxarm32-gpl.tar.xz", - "bin_name": "ffmpeg", - }, - }, - "Darwin": { # macOS - "x86_64": { - "url": "https://evermeet.cx/ffmpeg/getrelease/zip", - "bin_name": "ffmpeg", - }, - "arm64": { # Apple Silicon - "url": "https://evermeet.cx/ffmpeg/getrelease/zip", - "bin_name": "ffmpeg", - }, - }, - } - - MAX_RETRIES = 3 - RETRY_DELAY = 1 # seconds - - def __init__(self): - # Use system temp directory for temporary files - self.tmp_path = Path(tempfile.gettempdir()) / "ffmpeg_tmp" - try: - self.tmp_path.mkdir(parents=True, exist_ok=True) - except Exception as e: - logger.warning(f"Could not create tmp directory, using system temp: {e}") - self.tmp_path = Path(tempfile.gettempdir()) - - # Use XDG_DATA_HOME or fallback to ~/.local/share for Linux/macOS - if platform.system() in ["Linux", "Darwin"]: - xdg_data = os.environ.get("XDG_DATA_HOME", os.path.expanduser("~/.local/share")) - self.base_path = Path(xdg_data) / "red-discordbot" / "cogs" / "VideoArchiver" / "bin" - else: # Windows - appdata = os.environ.get("APPDATA", os.path.expanduser("~/AppData/Roaming")) - self.base_path = Path(appdata) / "Red-DiscordBot" / "cogs" / "VideoArchiver" / "bin" - - # Create bin directory with proper permissions if it doesn't exist - try: - self.base_path.mkdir(parents=True, exist_ok=True) - if platform.system() != "Windows": - # Try to set directory permissions, but don't fail if we can't - try: - self.base_path.chmod(0o755) - logger.info(f"Created bin directory with permissions: {oct(self.base_path.stat().st_mode)[-3:]}") - except Exception as e: - logger.warning(f"Could not set bin directory permissions: {e}") - except Exception as e: - logger.error(f"Failed to create bin directory: {e}") - raise FFmpegError(f"Failed to initialize FFmpeg directory: {e}") - - # Get system architecture - self.system = platform.system() - self.machine = platform.machine().lower() - if self.machine == "arm64": - self.machine = "aarch64" # Normalize ARM64 naming - - # Try to use system FFmpeg first - system_ffmpeg = shutil.which("ffmpeg") - if system_ffmpeg: - self.ffmpeg_path = Path(system_ffmpeg) - logger.info(f"Using system FFmpeg: {self.ffmpeg_path}") - return - - # Check for existing FFmpeg in our bin directory - try: - arch_config = self.FFMPEG_URLS[self.system][self.machine] - self.ffmpeg_path = self.base_path / arch_config["bin_name"] - - if not self.ffmpeg_path.exists() or not self._verify_ffmpeg(): - logger.info("Downloading FFmpeg...") - self._download_ffmpeg() - if not self._verify_ffmpeg(): - raise FFmpegError("Downloaded FFmpeg binary is not functional") - except KeyError: - raise FFmpegError(f"Unsupported system/architecture: {self.system}/{self.machine}") - - self._gpu_info = self._detect_gpu() - self._cpu_cores = multiprocessing.cpu_count() - - def _verify_ffmpeg(self) -> bool: - """Verify FFmpeg binary works""" - for attempt in range(self.MAX_RETRIES): - try: - if not self.ffmpeg_path.exists(): - return False - - # Make binary executable on Unix systems - if self.system != "Windows": - try: - current_mode = self.ffmpeg_path.stat().st_mode - # Add execute permission for user (700) - new_mode = current_mode | stat.S_IRWXU - self.ffmpeg_path.chmod(new_mode) - logger.info(f"Set FFmpeg permissions to: {oct(new_mode)[-3:]}") - except Exception as e: - logger.error(f"Failed to set FFmpeg executable permissions: {e}") - # Continue anyway as the file might already have correct permissions - pass - - # Test FFmpeg and check for required encoders - result = subprocess.run( - [str(self.ffmpeg_path), "-encoders"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - timeout=5, - ) - - if result.returncode != 0: - if attempt < self.MAX_RETRIES - 1: - time.sleep(self.RETRY_DELAY) - continue - return False - - # Verify encoders are available - encoders = result.stdout.decode() - required_encoders = ["libx264"] # Base requirement - if self._gpu_info["nvidia"]: - required_encoders.append("h264_nvenc") - if self._gpu_info["amd"]: - required_encoders.append("h264_amf") - if self._gpu_info["intel"]: - required_encoders.append("h264_qsv") - - for encoder in required_encoders: - if encoder not in encoders: - logger.warning(f"Required encoder {encoder} not available") - if encoder != "libx264": # Only warn for GPU encoders - self._gpu_info[ - encoder.split("_")[1].replace("h264", "") - ] = False - - return True - - except Exception as e: - logger.error( - f"FFmpeg verification attempt {attempt + 1} failed: {e}" - ) - if attempt < self.MAX_RETRIES - 1: - time.sleep(self.RETRY_DELAY) - else: - return False - - return False - - def _detect_gpu(self) -> dict: - """Detect available GPU and its capabilities""" - gpu_info = {"nvidia": False, "amd": False, "intel": False, "arm": False} - - try: - if self.system == "Linux": - # Check for NVIDIA GPU - try: - # First check for NVENC capability - nvidia_smi = subprocess.run( - ["nvidia-smi", "-q", "-d", "ENCODER"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - timeout=5, - ) - if nvidia_smi.returncode == 0 and b"Encoder" in nvidia_smi.stdout: - # Verify NVENC functionality - test_cmd = [ - str(self.ffmpeg_path), - "-f", - "lavfi", - "-i", - "testsrc=duration=1:size=1280x720:rate=30", - "-c:v", - "h264_nvenc", - "-f", - "null", - "-", - ] - result = subprocess.run( - test_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - if result.returncode == 0: - gpu_info["nvidia"] = True - except (subprocess.TimeoutExpired, FileNotFoundError): - pass - - # Check for AMD GPU - try: - if os.path.exists("/dev/dri/renderD128"): - with open("/sys/class/drm/renderD128/device/vendor", "r") as f: - vendor = f.read().strip() - if vendor == "0x1002": # AMD vendor ID - # Verify AMF functionality - test_cmd = [ - str(self.ffmpeg_path), - "-f", - "lavfi", - "-i", - "testsrc=duration=1:size=1280x720:rate=30", - "-c:v", - "h264_amf", - "-f", - "null", - "-", - ] - result = subprocess.run( - test_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - if result.returncode == 0: - gpu_info["amd"] = True - except (IOError, OSError): - pass - - # Check for Intel GPU - try: - lspci = subprocess.run( - ["lspci"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - timeout=5, - ) - output = lspci.stdout.decode().lower() - if "intel" in output and ("vga" in output or "display" in output): - # Verify QSV functionality - test_cmd = [ - str(self.ffmpeg_path), - "-f", - "lavfi", - "-i", - "testsrc=duration=1:size=1280x720:rate=30", - "-c:v", - "h264_qsv", - "-f", - "null", - "-", - ] - result = subprocess.run( - test_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - if result.returncode == 0: - gpu_info["intel"] = True - except (subprocess.TimeoutExpired, FileNotFoundError): - pass - - # Check for ARM GPU - if self.machine in ["aarch64", "armv7l"]: - gpu_info["arm"] = True - - elif self.system == "Windows": - try: - # Use PowerShell to get GPU info - ps_command = "Get-WmiObject Win32_VideoController | ConvertTo-Json" - result = subprocess.run( - ["powershell", "-Command", ps_command], - capture_output=True, - text=True, - timeout=10, - ) - if result.returncode == 0: - gpu_data = json.loads(result.stdout) - if not isinstance(gpu_data, list): - gpu_data = [gpu_data] - - for gpu in gpu_data: - name = gpu.get("Name", "").lower() - if "nvidia" in name: - # Verify NVENC - test_cmd = [ - str(self.ffmpeg_path), - "-f", - "lavfi", - "-i", - "testsrc=duration=1:size=1280x720:rate=30", - "-c:v", - "h264_nvenc", - "-f", - "null", - "-", - ] - result = subprocess.run( - test_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - if result.returncode == 0: - gpu_info["nvidia"] = True - if "amd" in name or "radeon" in name: - # Verify AMF - test_cmd = [ - str(self.ffmpeg_path), - "-f", - "null", - "-", - ] - result = subprocess.run( - test_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - if result.returncode == 0: - gpu_info["amd"] = True - if "intel" in name: - # Verify QSV - test_cmd = [ - str(self.ffmpeg_path), - "-f", - "lavfi", - "-i", - "testsrc=duration=1:size=1280x720:rate=30", - "-c:v", - "h264_qsv", - "-f", - "null", - "-", - ] - result = subprocess.run( - test_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - if result.returncode == 0: - gpu_info["intel"] = True - except Exception: - # Fallback to dxdiag if PowerShell method fails - with temp_path_context() as temp_dir: - temp_path = os.path.join(temp_dir, "dxdiag.txt") - try: - subprocess.run( - ["dxdiag", "/t", temp_path], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - timeout=10, - ) - if os.path.exists(temp_path): - with open(temp_path, "r", errors="ignore") as f: - content = f.read().lower() - # Only set GPU flags if we can verify encoder functionality - if "nvidia" in content: - test_cmd = [ - str(self.ffmpeg_path), - "-f", - "lavfi", - "-i", - "testsrc=duration=1:size=1280x720:rate=30", - "-c:v", - "h264_nvenc", - "-f", - "null", - "-", - ] - result = subprocess.run( - test_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - if result.returncode == 0: - gpu_info["nvidia"] = True - if "amd" in content or "radeon" in content: - test_cmd = [ - str(self.ffmpeg_path), - "-f", - "lavfi", - "-i", - "testsrc=duration=1:size=1280x720:rate=30", - "-c:v", - "h264_amf", - "-f", - "null", - "-", - ] - result = subprocess.run( - test_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - if result.returncode == 0: - gpu_info["amd"] = True - if "intel" in content: - test_cmd = [ - str(self.ffmpeg_path), - "-f", - "lavfi", - "-i", - "testsrc=duration=1:size=1280x720:rate=30", - "-c:v", - "h264_qsv", - "-f", - "null", - "-", - ] - result = subprocess.run( - test_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - if result.returncode == 0: - gpu_info["intel"] = True - except Exception as e: - logger.error(f"Error during dxdiag GPU detection: {str(e)}") - - except Exception as e: - logger.warning(f"GPU detection failed: {str(e)}") - - return gpu_info - - def _analyze_video(self, input_path: str) -> dict: - """Analyze video content for optimal encoding settings""" - try: - probe = ffmpeg.probe(input_path) - video_info = next(s for s in probe["streams"] if s["codec_type"] == "video") - - # Get video properties - width = int(video_info.get("width", 0)) - height = int(video_info.get("height", 0)) - fps = eval(video_info.get("r_frame_rate", "30/1")) - duration = float(probe["format"].get("duration", 0)) - bitrate = float(probe["format"].get("bit_rate", 0)) - - # Advanced analysis - has_high_motion = False - has_dark_scenes = False - has_complex_scenes = False - - # Analyze frame statistics if available - if video_info.get("avg_frame_rate"): - avg_fps = eval(video_info["avg_frame_rate"]) - if abs(avg_fps - fps) > 5: # Significant frame rate variation - has_high_motion = True - - # Check for dark scenes and complexity - try: - # Sample frames for analysis - with temp_path_context() as temp_dir: - frames_file = os.path.join(temp_dir, "frames.txt") - sample_cmd = [ - str(self.ffmpeg_path), - "-i", - input_path, - "-vf", - "select='eq(pict_type,I)',signalstats", - "-show_entries", - "frame_tags=lavfi.signalstats.YAVG", - "-f", - "null", - "-", - ] - result = subprocess.run(sample_cmd, capture_output=True, text=True) - - # Analyze brightness levels - dark_frames = 0 - total_frames = 0 - for line in result.stderr.split("\n"): - if "YAVG" in line: - avg_brightness = float(line.split("=")[1]) - if avg_brightness < 40: # Dark scene threshold - dark_frames += 1 - total_frames += 1 - - if total_frames > 0 and (dark_frames / total_frames) > 0.2: - has_dark_scenes = True - except Exception as e: - logger.warning(f"Advanced scene analysis failed: {str(e)}") - - # Get audio properties - audio_info = next( - (s for s in probe["streams"] if s["codec_type"] == "audio"), None - ) - audio_bitrate = 0 - audio_channels = 2 - audio_sample_rate = 48000 - if audio_info: - audio_bitrate = int(audio_info.get("bit_rate", 0)) - audio_channels = int(audio_info.get("channels", 2)) - audio_sample_rate = int(audio_info.get("sample_rate", 48000)) - - return { - "width": width, - "height": height, - "fps": fps, - "duration": duration, - "bitrate": bitrate, - "has_high_motion": has_high_motion, - "has_dark_scenes": has_dark_scenes, - "has_complex_scenes": has_complex_scenes, - "audio_bitrate": audio_bitrate, - "audio_channels": audio_channels, - "audio_sample_rate": audio_sample_rate, - } - except Exception as e: - logger.error(f"Error analyzing video: {str(e)}") - return {} - - def _get_optimal_ffmpeg_params( - self, input_path: str, target_size_bytes: int - ) -> dict: - """Get optimal FFmpeg parameters based on hardware and video analysis""" - # Analyze video content - video_info = self._analyze_video(input_path) - - # Base parameters - params = { - "c:v": "libx264", # Default to CPU encoding - "threads": str(self._cpu_cores), # Use all CPU cores - "preset": "medium", - "crf": "23", # Default quality - "maxrate": None, - "bufsize": None, - "movflags": "+faststart", # Optimize for web playback - "profile:v": "high", # High profile for better quality - "level": "4.1", # Compatibility level - "pix_fmt": "yuv420p", # Standard pixel format - } - - # Add advanced encoding parameters - params.update( - { - "x264opts": "rc-lookahead=60:me=umh:subme=7:ref=4:b-adapt=2:direct=auto", - "tune": "film", # General-purpose tuning - "fastfirstpass": "1", # Fast first pass for two-pass encoding - } - ) - - # Adjust for content type - if video_info.get("has_high_motion"): - params.update( - { - "tune": "grain", # Better for high motion - "x264opts": params["x264opts"] - + ":deblock=-1,-1:psy-rd=1.0:aq-strength=0.8", - } - ) - - if video_info.get("has_dark_scenes"): - # Optimize for dark scenes - params.update( - { - "x264opts": params["x264opts"] - + ":aq-mode=3:aq-strength=1.0:deblock=1:1", - "tune": ( - "film" if not video_info.get("has_high_motion") else "grain" - ), - } - ) - - # GPU-specific optimizations with fallback - if self._gpu_info["nvidia"]: - try: - params.update( - { - "c:v": "h264_nvenc", - "preset": "p7", # Highest quality NVENC preset - "rc:v": "vbr", # Variable bitrate - "cq:v": "19", # Quality level - "b_ref_mode": "middle", - "spatial-aq": "1", - "temporal-aq": "1", - "rc-lookahead": "32", - "surfaces": "64", - "max_muxing_queue_size": "1024", - "gpu": "any", # Allow any available GPU - } - ) - - # Test NVENC configuration - test_cmd = ( - [ - str(self.ffmpeg_path), - "-f", - "lavfi", - "-i", - "testsrc=duration=1:size=1280x720:rate=30", - "-c:v", - "h264_nvenc", - ] - + [ - ( - f"-{k}" - if len(k) == 1 - else f"-{k}" if not v else f"-{k}" f" {v}" - ) - for k, v in params.items() - if k != "c:v" - ] - + ["-f", "null", "-"] - ) - result = subprocess.run(test_cmd, capture_output=True) - if result.returncode != 0: - raise GPUError("NVENC test failed") - - except Exception as e: - logger.error( - f"NVENC initialization failed, falling back to CPU: {str(e)}" - ) - self._gpu_info["nvidia"] = False - params["c:v"] = "libx264" # Fallback to CPU - - elif self._gpu_info["amd"]: - try: - params.update( - { - "c:v": "h264_amf", - "quality": "quality", - "rc": "vbr_peak", - "enforce_hrd": "1", - "vbaq": "1", - "preanalysis": "1", - "max_muxing_queue_size": "1024", - } - ) - - # Test AMF configuration - test_cmd = ( - [ - str(self.ffmpeg_path), - "-f", - "lavfi", - "-i", - "testsrc=duration=1:size=1280x720:rate=30", - "-c:v", - "h264_amf", - ] - + [ - ( - f"-{k}" - if len(k) == 1 - else f"-{k}" if not v else f"-{k}" f" {v}" - ) - for k, v in params.items() - if k != "c:v" - ] - + ["-f", "null", "-"] - ) - result = subprocess.run(test_cmd, capture_output=True) - if result.returncode != 0: - raise GPUError("AMF test failed") - - except Exception as e: - logger.error( - f"AMF initialization failed, falling back to CPU: {str(e)}" - ) - self._gpu_info["amd"] = False - params["c:v"] = "libx264" # Fallback to CPU - - elif self._gpu_info["intel"]: - try: - params.update( - { - "c:v": "h264_qsv", - "preset": "veryslow", - "look_ahead": "1", - "global_quality": "23", - "max_muxing_queue_size": "1024", - } - ) - - # Test QSV configuration - test_cmd = ( - [ - str(self.ffmpeg_path), - "-f", - "lavfi", - "-i", - "testsrc=duration=1:size=1280x720:rate=30", - "-c:v", - "h264_qsv", - ] - + [ - ( - f"-{k}" - if len(k) == 1 - else f"-{k}" if not v else f"-{k}" f" {v}" - ) - for k, v in params.items() - if k != "c:v" - ] - + ["-f", "null", "-"] - ) - result = subprocess.run(test_cmd, capture_output=True) - if result.returncode != 0: - raise GPUError("QSV test failed") - - except Exception as e: - logger.error( - f"QSV initialization failed, falling back to CPU: {str(e)}" - ) - self._gpu_info["intel"] = False - params["c:v"] = "libx264" # Fallback to CPU - - try: - # Calculate target bitrate - input_size = os.path.getsize(input_path) - duration = video_info.get("duration", 0) - - if duration > 0 and input_size > target_size_bytes: - # Reserve 5% for container overhead - video_size_target = int(target_size_bytes * 0.95) - - # Calculate optimal audio bitrate - total_bitrate = (video_size_target * 8) / duration - - # Determine audio quality based on content - audio_channels = video_info.get("audio_channels", 2) - min_audio_bitrate = 64000 * audio_channels # Minimum per channel - max_audio_bitrate = 192000 * audio_channels # Maximum per channel - - # Allocate 10-20% for audio depending on content - audio_bitrate = min( - max_audio_bitrate, - max(min_audio_bitrate, int(total_bitrate * 0.15)), # 15% baseline - ) - - # Remaining bitrate for video - video_bitrate = int((video_size_target * 8) / duration - audio_bitrate) - - # Set bitrate constraints - params["maxrate"] = str(int(video_bitrate * 1.5)) # Allow 50% overflow - params["bufsize"] = str(int(video_bitrate * 2)) # Double buffer size - - # Adjust quality based on compression ratio and content - ratio = input_size / target_size_bytes - if ratio > 4: - params["crf"] = "26" if params["c:v"] == "libx264" else "23" - params["preset"] = "faster" - elif ratio > 2: - params["crf"] = "23" if params["c:v"] == "libx264" else "21" - params["preset"] = "medium" - else: - params["crf"] = "20" if params["c:v"] == "libx264" else "19" - params["preset"] = "slow" - - # Adjust for dark scenes - if video_info.get("has_dark_scenes"): - if params["c:v"] == "libx264": - params["crf"] = str( - max(18, int(params["crf"]) - 2) - ) # Better quality for dark scenes - elif params["c:v"] == "h264_nvenc": - params["cq:v"] = str(max(15, int(params["cq:v"]) - 2)) - - # Audio settings - params.update( - { - "c:a": "aac", - "b:a": f"{int(audio_bitrate/1000)}k", - "ar": str(video_info.get("audio_sample_rate", 48000)), - "ac": str(video_info.get("audio_channels", 2)), - } - ) - - except Exception as e: - logger.error(f"Error calculating bitrates: {str(e)}") - # Use safe default parameters - params.update( - { - "crf": "23", - "preset": "medium", - "maxrate": f"{2 * 1024 * 1024}", # 2 Mbps - "bufsize": f"{4 * 1024 * 1024}", # 4 Mbps buffer - "c:a": "aac", - "b:a": "128k", - "ar": "48000", - "ac": "2", - } - ) - - return params - - def _download_ffmpeg(self): - """Download and extract FFmpeg binary""" - try: - arch_config = self.FFMPEG_URLS[self.system][self.machine] - except KeyError: - raise DownloadError( - f"Unsupported system/architecture: {self.system}/{self.machine}" - ) - - url = arch_config["url"] - - with temp_path_context() as temp_dir: - archive_path = ( - Path(temp_dir) - / f"ffmpeg_archive{'.zip' if self.system == 'Windows' else '.tar.xz'}" - ) - - try: - # Download archive with retries - for attempt in range(self.MAX_RETRIES): - try: - response = requests.get(url, stream=True, timeout=30) - response.raise_for_status() - with open(archive_path, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - break - except Exception as e: - if attempt == self.MAX_RETRIES - 1: - raise DownloadError(f"Failed to download FFmpeg: {str(e)}") - time.sleep(self.RETRY_DELAY) - - # Remove existing ffmpeg if it exists (handle both file and directory) - if self.ffmpeg_path.exists(): - if self.ffmpeg_path.is_dir(): - shutil.rmtree(self.ffmpeg_path) - else: - self.ffmpeg_path.unlink() - - # Extract archive - if self.system == "Windows": - with zipfile.ZipFile(archive_path, "r") as zip_ref: - ffmpeg_files = [ - f - for f in zip_ref.namelist() - if arch_config["bin_name"] in f - ] - if not ffmpeg_files: - raise DownloadError("FFmpeg binary not found in archive") - zip_ref.extract(ffmpeg_files[0], self.base_path) - os.rename(self.base_path / ffmpeg_files[0], self.ffmpeg_path) - else: - with tarfile.open(archive_path, "r:xz") as tar_ref: - ffmpeg_files = [ - f - for f in tar_ref.getnames() - if arch_config["bin_name"] in f - ] - if not ffmpeg_files: - raise DownloadError("FFmpeg binary not found in archive") - tar_ref.extract(ffmpeg_files[0], self.base_path) - os.rename(self.base_path / ffmpeg_files[0], self.ffmpeg_path) - - # Set executable permissions on Unix systems - if self.system != "Windows": - try: - self.ffmpeg_path.chmod( - self.ffmpeg_path.stat().st_mode | stat.S_IEXEC - ) - except Exception as e: - logger.error(f"Failed to set executable permissions: {str(e)}") - - except Exception as e: - logger.error(f"FFmpeg download/extraction failed: {str(e)}") - raise DownloadError(str(e)) - - def force_download(self) -> bool: - """Force re-download of FFmpeg binary""" - try: - # Remove existing binary if it exists - if self.ffmpeg_path.exists(): - try: - self.ffmpeg_path.unlink() - except Exception as e: - logger.error(f"Failed to remove existing FFmpeg: {str(e)}") - return False - - # Download new binary - self._download_ffmpeg() - - # Verify new binary - return self._verify_ffmpeg() - except Exception as e: - logger.error(f"Failed to force download FFmpeg: {str(e)}") - return False - - def get_ffmpeg_path(self) -> str: - """Get path to FFmpeg binary""" - if not self.ffmpeg_path.exists(): - raise FFmpegError("FFmpeg is not available") - return str(self.ffmpeg_path) - - def get_compression_params(self, input_path: str, target_size_mb: int) -> dict: - """Get optimal compression parameters for the given input file""" - return self._get_optimal_ffmpeg_params(input_path, target_size_mb * 1024 * 1024) +__all__ = ['FFmpegManager', 'FFmpegError', 'GPUError', 'DownloadError']