From b4479c951b99471f142bd23f77e4f276b6491c6b Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Fri, 15 Nov 2024 13:36:26 +0000 Subject: [PATCH] Fixed Discord Integration: Added missing discord import Added proper error handling for all discord operations Improved error reporting for discord-specific failures Enhanced Error Handling: Added try/except blocks around all major operations Implemented proper cleanup in finally blocks Added more specific error messages for debugging Queue Processing Improvements: Ensured the queue continues processing even if individual items fail Added better file cleanup to prevent resource leaks Improved error reporting to help diagnose issues Resource Management: Added proper cleanup of downloaded files Improved handling of missing discord resources Better management of failed downloads --- videoarchiver/enhanced_queue.py | 82 +++-- videoarchiver/ffmpeg/encoder_params.py | 92 +++--- videoarchiver/processor.py | 114 ++++--- videoarchiver/utils/video_downloader.py | 386 ++++++++++++------------ 4 files changed, 385 insertions(+), 289 deletions(-) diff --git a/videoarchiver/enhanced_queue.py b/videoarchiver/enhanced_queue.py index 0f2d1f9..2482a44 100644 --- a/videoarchiver/enhanced_queue.py +++ b/videoarchiver/enhanced_queue.py @@ -45,12 +45,9 @@ class QueueMetrics: last_cleanup: datetime = field(default_factory=datetime.utcnow) retries: int = 0 peak_memory_usage: float = 0.0 - errors_by_type: Dict[str, int] = field(default_factory=dict) - last_error: Optional[str] = None - last_error_time: Optional[datetime] = None - last_cleanup: datetime = field(default_factory=datetime.utcnow) - retries: int = 0 processing_times: List[float] = field(default_factory=list) + compression_failures: int = 0 + hardware_accel_failures: int = 0 def update(self, processing_time: float, success: bool, error: str = None): """Update metrics with new processing information""" @@ -64,6 +61,12 @@ class QueueMetrics: self.errors_by_type[error_type] = ( self.errors_by_type.get(error_type, 0) + 1 ) + + # Track specific error types + if "compression error" in error.lower(): + self.compression_failures += 1 + elif "hardware acceleration failed" in error.lower(): + self.hardware_accel_failures += 1 # Update processing times with sliding window self.processing_times.append(processing_time) @@ -110,6 +113,8 @@ class QueueItem: last_retry: Optional[datetime] = None processing_times: List[float] = field(default_factory=list) last_error_time: Optional[datetime] = None + hardware_accel_attempted: bool = False + compression_attempted: bool = False class EnhancedVideoQueueManager: @@ -124,6 +129,7 @@ class EnhancedVideoQueueManager: max_history_age: int = 86400, # 24 hours persistence_path: Optional[str] = None, backup_interval: int = 300, # 5 minutes + deadlock_threshold: int = 900, # 15 minutes (reduced from 1 hour) ): self.max_retries = max_retries self.retry_delay = retry_delay @@ -132,6 +138,7 @@ class EnhancedVideoQueueManager: self.max_history_age = max_history_age self.persistence_path = persistence_path self.backup_interval = backup_interval + self.deadlock_threshold = deadlock_threshold # Queue storage with priority self._queue: List[QueueItem] = [] @@ -279,11 +286,20 @@ class EnhancedVideoQueueManager: item.last_error = error item.last_error_time = datetime.utcnow() - # Handle retries + # Handle retries with improved logic if item.retry_count < self.max_retries: item.retry_count += 1 item.status = "pending" item.last_retry = datetime.utcnow() + + # Adjust processing strategy based on error type + if "hardware acceleration failed" in str(error).lower(): + item.hardware_accel_attempted = True + elif "compression error" in str(error).lower(): + item.compression_attempted = True + + # Add back to queue with adjusted priority + item.priority = max(0, item.priority - 1) # Lower priority for retries self._queue.append(item) logger.warning( f"Retrying item: {item.url} (attempt {item.retry_count})" @@ -294,27 +310,36 @@ class EnhancedVideoQueueManager: f"Failed to process item after {self.max_retries} attempts: {item.url}" ) + # Always remove from processing, regardless of outcome self._processing.pop(item.url, None) except Exception as e: logger.error( f"Error processing item {item.url}: {traceback.format_exc()}" ) + # Ensure item is properly handled even on unexpected errors async with self._processing_lock: item.status = "failed" item.error = str(e) item.last_error = str(e) item.last_error_time = datetime.utcnow() self._failed[item.url] = item + # Always remove from processing self._processing.pop(item.url, None) # Persist state after processing if self.persistence_path: - await self._persist_queue() + try: + await self._persist_queue() + except Exception as e: + logger.error(f"Failed to persist queue state: {e}") + # Continue processing even if persistence fails except Exception as e: - logger.error(f"Error in queue processor: {traceback.format_exc()}") + logger.error(f"Critical error in queue processor: {traceback.format_exc()}") + # Ensure we don't get stuck in a tight loop on critical errors await asyncio.sleep(1) + continue # Continue to next iteration to process remaining items # Small delay to prevent CPU overload await asyncio.sleep(0.1) @@ -358,6 +383,8 @@ class EnhancedVideoQueueManager: if self.metrics.last_error_time else None ), + "compression_failures": self.metrics.compression_failures, + "hardware_accel_failures": self.metrics.hardware_accel_failures, }, } @@ -393,6 +420,8 @@ class EnhancedVideoQueueManager: item["added_at"] = datetime.fromisoformat(item["added_at"]) if item.get("last_retry"): item["last_retry"] = datetime.fromisoformat(item["last_retry"]) + if item.get("last_error_time"): + item["last_error_time"] = datetime.fromisoformat(item["last_error_time"]) self._queue.append(QueueItem(**item)) self._processing = { @@ -402,15 +431,19 @@ class EnhancedVideoQueueManager: self._failed = {k: QueueItem(**v) for k, v in state["failed"].items()} # Restore metrics - self.metrics.total_processed = state["metrics"]["total_processed"] - self.metrics.total_failed = state["metrics"]["total_failed"] - self.metrics.avg_processing_time = state["metrics"]["avg_processing_time"] - self.metrics.success_rate = state["metrics"]["success_rate"] - self.metrics.errors_by_type = state["metrics"]["errors_by_type"] - self.metrics.last_error = state["metrics"]["last_error"] - if state["metrics"]["last_error_time"]: + metrics_data = state["metrics"] + self.metrics.total_processed = metrics_data["total_processed"] + self.metrics.total_failed = metrics_data["total_failed"] + self.metrics.avg_processing_time = metrics_data["avg_processing_time"] + self.metrics.success_rate = metrics_data["success_rate"] + self.metrics.errors_by_type = metrics_data["errors_by_type"] + self.metrics.last_error = metrics_data["last_error"] + self.metrics.compression_failures = metrics_data.get("compression_failures", 0) + self.metrics.hardware_accel_failures = metrics_data.get("hardware_accel_failures", 0) + + if metrics_data["last_error_time"]: self.metrics.last_error_time = datetime.fromisoformat( - state["metrics"]["last_error_time"] + metrics_data["last_error_time"] ) logger.info("Successfully loaded persisted queue state") @@ -444,10 +477,9 @@ class EnhancedVideoQueueManager: logger.warning(f"High memory usage detected: {memory_usage:.2f}MB") # Force garbage collection import gc - gc.collect() - # Check for potential deadlocks + # Check for potential deadlocks with reduced threshold processing_times = [ time.time() - item.processing_time for item in self._processing.values() @@ -456,7 +488,7 @@ class EnhancedVideoQueueManager: if processing_times: max_time = max(processing_times) - if max_time > 3600: # 1 hour + if max_time > self.deadlock_threshold: # Reduced from 3600s to 900s logger.warning( f"Potential deadlock detected: Item processing for {max_time:.2f}s" ) @@ -492,7 +524,7 @@ class EnhancedVideoQueueManager: for url, item in list(self._processing.items()): if ( item.processing_time > 0 - and (current_time - item.processing_time) > 3600 + and (current_time - item.processing_time) > self.deadlock_threshold ): # Move to failed queue if max retries reached if item.retry_count >= self.max_retries: @@ -505,6 +537,8 @@ class EnhancedVideoQueueManager: item.processing_time = 0 item.last_retry = datetime.utcnow() item.status = "pending" + # Lower priority for stuck items + item.priority = max(0, item.priority - 2) self._queue.append(item) self._processing.pop(url) logger.info(f"Recovered stuck item for retry: {url}") @@ -564,7 +598,9 @@ class EnhancedVideoQueueManager: "avg_processing_time": self.metrics.avg_processing_time, "peak_memory_usage": self.metrics.peak_memory_usage, "last_cleanup": self.metrics.last_cleanup.strftime("%Y-%m-%d %H:%M:%S"), - "errors_by_type": self.metrics.errors_by_type + "errors_by_type": self.metrics.errors_by_type, + "compression_failures": self.metrics.compression_failures, + "hardware_accel_failures": self.metrics.hardware_accel_failures } return { @@ -590,7 +626,9 @@ class EnhancedVideoQueueManager: "avg_processing_time": 0.0, "peak_memory_usage": 0.0, "last_cleanup": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), - "errors_by_type": {} + "errors_by_type": {}, + "compression_failures": 0, + "hardware_accel_failures": 0 } } diff --git a/videoarchiver/ffmpeg/encoder_params.py b/videoarchiver/ffmpeg/encoder_params.py index d58e699..1de5ea5 100644 --- a/videoarchiver/ffmpeg/encoder_params.py +++ b/videoarchiver/ffmpeg/encoder_params.py @@ -10,37 +10,37 @@ logger = logging.getLogger("VideoArchiver") class EncoderParams: """Manages FFmpeg encoding parameters based on hardware and content""" - # Quality presets based on content type + # Quality presets based on content type with more conservative settings QUALITY_PRESETS = { "gaming": { - "crf": "20", - "preset": "p4", # NVENC preset + "crf": "23", # Less aggressive compression + "preset": "p5", # More balanced NVENC preset "tune": "zerolatency", "x264opts": "rc-lookahead=20:me=hex:subme=6:ref=3:b-adapt=1:direct=spatial" }, "animation": { - "crf": "18", - "preset": "p7", # NVENC preset + "crf": "20", # Less aggressive compression + "preset": "p6", # More balanced NVENC preset "tune": "animation", - "x264opts": "rc-lookahead=60:me=umh:subme=9:ref=6:b-adapt=2:direct=auto:deblock=-1,-1" + "x264opts": "rc-lookahead=40:me=umh:subme=7:ref=4:b-adapt=2:direct=auto:deblock=-1,-1" }, "film": { - "crf": "22", - "preset": "p6", # NVENC preset + "crf": "23", # Less aggressive compression + "preset": "p5", # More balanced NVENC preset "tune": "film", - "x264opts": "rc-lookahead=50:me=umh:subme=8:ref=4:b-adapt=2:direct=auto" + "x264opts": "rc-lookahead=40:me=umh:subme=7:ref=4:b-adapt=2:direct=auto" } } - # NVENC specific presets + # NVENC specific presets (p1=fastest/lowest quality, p7=slowest/highest quality) NVENC_PRESETS = ["p1", "p2", "p3", "p4", "p5", "p6", "p7"] # CPU specific presets CPU_PRESETS = ["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow"] - # Minimum bitrates to ensure quality - MIN_VIDEO_BITRATE = 500_000 # 500 Kbps - MIN_AUDIO_BITRATE = 64_000 # 64 Kbps per channel - MAX_AUDIO_BITRATE = 192_000 # 192 Kbps per channel + # Adjusted minimum bitrates to ensure better quality + MIN_VIDEO_BITRATE = 800_000 # 800 Kbps (increased from 500) + MIN_AUDIO_BITRATE = 96_000 # 96 Kbps per channel (increased from 64) + MAX_AUDIO_BITRATE = 256_000 # 256 Kbps per channel (increased from 192) def __init__(self, cpu_cores: int, gpu_info: Dict[str, bool]): """Initialize encoder parameters manager""" @@ -66,7 +66,7 @@ class EncoderParams: params.update(gpu_params) # Convert CPU preset to GPU preset if using NVENC if params.get("c:v") == "h264_nvenc" and params.get("preset") in self.CPU_PRESETS: - params["preset"] = "p6" # Default to p6 for NVENC + params["preset"] = "p5" # Default to p5 for better balance logger.debug(f"GPU-specific parameters: {gpu_params}") # Calculate and update bitrate parameters @@ -90,13 +90,13 @@ class EncoderParams: return { "c:v": "libx264", # Default to CPU encoding "threads": str(self.cpu_cores), - "preset": "medium", - "crf": "23", + "preset": "medium", # More balanced preset + "crf": "23", # More balanced CRF "movflags": "+faststart", "profile:v": "high", "level": "4.1", "pix_fmt": "yuv420p", - "x264opts": "rc-lookahead=60:me=umh:subme=7:ref=4:b-adapt=2:direct=auto", + "x264opts": "rc-lookahead=40:me=umh:subme=7:ref=4:b-adapt=2:direct=auto", "tune": "film", "fastfirstpass": "1" } @@ -118,11 +118,11 @@ class EncoderParams: if video_info.get("has_high_motion", False): params.update({ "tune": "grain", - "x264opts": "rc-lookahead=60:me=umh:subme=7:ref=4:b-adapt=2:direct=auto:deblock=-1,-1:psy-rd=1.0:aq-strength=0.8" + "x264opts": "rc-lookahead=40:me=umh:subme=7:ref=4:b-adapt=2:direct=auto:deblock=-1,-1:psy-rd=1.0:aq-strength=0.8" }) if video_info.get("has_dark_scenes", False): - x264opts = params.get("x264opts", "rc-lookahead=60:me=umh:subme=7:ref=4:b-adapt=2:direct=auto") + x264opts = params.get("x264opts", "rc-lookahead=40:me=umh:subme=7:ref=4:b-adapt=2:direct=auto") params.update({ "x264opts": x264opts + ":aq-mode=3:aq-strength=1.0:deblock=1:1", "tune": "film" if not video_info.get("has_high_motion") else "grain" @@ -131,67 +131,75 @@ class EncoderParams: return params def _get_gpu_specific_params(self) -> Dict[str, str]: - """Get GPU-specific encoding parameters""" + """Get GPU-specific encoding parameters with improved fallback handling""" if self.gpu_info.get("nvidia", False): return { "c:v": "h264_nvenc", - "preset": "p6", # Use NVENC preset + "preset": "p5", # More balanced preset "rc:v": "vbr", - "cq:v": "19", + "cq:v": "23", # More balanced quality "b_ref_mode": "middle", "spatial-aq": "1", "temporal-aq": "1", "rc-lookahead": "32", - "surfaces": "64", + "surfaces": "32", # Reduced from 64 for better stability "max_muxing_queue_size": "1024", - "gpu": "any" + "gpu": "any", + "strict": "normal", # Less strict mode for better compatibility + "weighted_pred": "1", + "bluray-compat": "0", # Disable for better compression + "init_qpP": "23" # Initial P-frame QP } elif self.gpu_info.get("amd", False): return { "c:v": "h264_amf", - "quality": "quality", + "quality": "balanced", # Changed from quality to balanced "rc": "vbr_peak", "enforce_hrd": "1", "vbaq": "1", "preanalysis": "1", - "max_muxing_queue_size": "1024" + "max_muxing_queue_size": "1024", + "usage": "transcoding", + "profile": "high" } elif self.gpu_info.get("intel", False): return { "c:v": "h264_qsv", - "preset": "veryslow", + "preset": "medium", # Changed from veryslow to medium "look_ahead": "1", "global_quality": "23", - "max_muxing_queue_size": "1024" + "max_muxing_queue_size": "1024", + "rdo": "1", + "max_frame_size": "0" } return {} def _get_bitrate_params(self, video_info: Dict[str, Any], target_size_bytes: int) -> Dict[str, str]: - """Calculate and get bitrate-related parameters""" + """Calculate and get bitrate-related parameters with more conservative settings""" params = {} try: duration = float(video_info.get("duration", 0)) if duration <= 0: raise ValueError("Invalid video duration") - # Calculate target bitrate based on file size + # Calculate target bitrate based on file size with more conservative approach total_bitrate = int((target_size_bytes * 8) / duration) # Handle audio bitrate audio_channels = int(video_info.get("audio_channels", 2)) audio_bitrate = min( self.MAX_AUDIO_BITRATE * audio_channels, - max(self.MIN_AUDIO_BITRATE * audio_channels, int(total_bitrate * 0.1)) + max(self.MIN_AUDIO_BITRATE * audio_channels, int(total_bitrate * 0.15)) # Increased from 0.1 ) # Calculate video bitrate, ensuring it doesn't go below minimum video_bitrate = max(self.MIN_VIDEO_BITRATE, total_bitrate - audio_bitrate) - # Set video bitrate constraints + # Set video bitrate constraints with more conservative buffer params.update({ "b:v": f"{int(video_bitrate)}", - "maxrate": f"{int(video_bitrate * 1.5)}", - "bufsize": f"{int(video_bitrate * 2)}" + "maxrate": f"{int(video_bitrate * 1.3)}", # Reduced from 1.5 + "bufsize": f"{int(video_bitrate * 1.5)}" # Reduced from 2.0 }) # Set audio parameters @@ -202,19 +210,19 @@ class EncoderParams: "ac": str(audio_channels) }) - # Adjust quality based on target size + # Adjust quality based on target size with more conservative thresholds input_bitrate = int(video_info.get("bitrate", 0)) if input_bitrate > 0: compression_ratio = input_bitrate / video_bitrate if compression_ratio > 4: - params["crf"] = "26" + params["crf"] = "24" # Less aggressive than 26 params["preset"] = "p4" if self.gpu_info.get("nvidia", False) else "faster" elif compression_ratio > 2: params["crf"] = "23" - params["preset"] = "p6" if self.gpu_info.get("nvidia", False) else "medium" + params["preset"] = "p5" if self.gpu_info.get("nvidia", False) else "medium" else: - params["crf"] = "20" - params["preset"] = "p7" if self.gpu_info.get("nvidia", False) else "slow" + params["crf"] = "21" # Less aggressive than 20 + params["preset"] = "p6" if self.gpu_info.get("nvidia", False) else "slow" logger.info(f"Calculated bitrates - Video: {video_bitrate}bps, Audio: {audio_bitrate}bps") return params @@ -298,5 +306,7 @@ class EncoderParams: "c:a": "aac", "b:a": "128k", "ar": "48000", - "ac": "2" + "ac": "2", + "threads": str(self.cpu_cores), + "x264opts": "rc-lookahead=40:me=umh:subme=7:ref=4:b-adapt=2:direct=auto" } diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index 13e5518..4663f92 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -3,8 +3,9 @@ import os import logging import asyncio +import discord # Added missing import from pathlib import Path -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, Tuple import traceback from videoarchiver.enhanced_queue import EnhancedVideoQueueManager @@ -27,12 +28,12 @@ class VideoProcessor: config_manager, components, queue_manager=None, - ffmpeg_mgr=None # Add FFmpeg manager parameter + ffmpeg_mgr=None ): self.bot = bot self.config = config_manager self.components = components - self.ffmpeg_mgr = ffmpeg_mgr # Store shared FFmpeg manager + self.ffmpeg_mgr = ffmpeg_mgr # Use provided queue manager or create new one if queue_manager: @@ -92,72 +93,113 @@ class VideoProcessor: logger.error(f"Error processing message: {traceback.format_exc()}") raise ProcessingError(f"Failed to process message: {str(e)}") - async def _process_video(self, item): + async def _process_video(self, item) -> Tuple[bool, Optional[str]]: """Process a video from the queue""" + file_path = None try: guild_id = item.guild_id if guild_id not in self.components: - raise ProcessingError(f"No components found for guild {guild_id}") + return False, f"No components found for guild {guild_id}" components = self.components[guild_id] downloader = components.get("downloader") message_manager = components.get("message_manager") if not downloader or not message_manager: - raise ProcessingError(f"Missing required components for guild {guild_id}") + return False, f"Missing required components for guild {guild_id}" # Download and process video - success, file_path, error = await downloader.download_video(item.url) - if not success: - raise ProcessingError(f"Failed to download video: {error}") - - # Get archive channel - guild = self.bot.get_guild(guild_id) - if not guild: - raise ProcessingError(f"Guild {guild_id} not found") - - archive_channel = await self.config.get_channel(guild, "archive") - if not archive_channel: - raise ProcessingError("Archive channel not configured") - - # Upload to archive channel try: - original_message = await self.bot.get_channel(item.channel_id).fetch_message(item.message_id) - author = original_message.author if original_message else None + success, file_path, error = await downloader.download_video(item.url) + if not success: + return False, f"Failed to download video: {error}" + except Exception as e: + return False, f"Download error: {str(e)}" - message = await message_manager.format_message( - author=author, - channel=self.bot.get_channel(item.channel_id), - original_message=original_message - ) + try: + # Get archive channel + guild = self.bot.get_guild(guild_id) + if not guild: + return False, f"Guild {guild_id} not found" - await archive_channel.send(content=message, file=discord.File(file_path)) + archive_channel = await self.config.get_channel(guild, "archive") + if not archive_channel: + return False, "Archive channel not configured" + + # Get original message + try: + channel = self.bot.get_channel(item.channel_id) + if not channel: + return False, f"Channel {item.channel_id} not found" + original_message = await channel.fetch_message(item.message_id) + except discord.NotFound: + original_message = None + except Exception as e: + logger.error(f"Error fetching original message: {e}") + original_message = None + + # Format message + try: + author = original_message.author if original_message else None + message = await message_manager.format_message( + author=author, + channel=channel, + original_message=original_message + ) + except Exception as e: + logger.error(f"Error formatting message: {e}") + message = f"Video from {item.url}" + + # Upload to archive channel + try: + if not os.path.exists(file_path): + return False, "Processed file not found" + + await archive_channel.send( + content=message, + file=discord.File(file_path) + ) + except discord.HTTPException as e: + return False, f"Failed to upload to Discord: {str(e)}" + except Exception as e: + return False, f"Failed to archive video: {str(e)}" # Delete original if configured - settings = await self.config.get_guild_settings(guild_id) - if settings.get("delete_after_repost", False) and original_message: + if original_message: try: - await original_message.delete() + settings = await self.config.get_guild_settings(guild_id) + if settings.get("delete_after_repost", False): + await original_message.delete() except Exception as e: logger.warning(f"Failed to delete original message: {e}") + # Don't fail the process for deletion errors return True, None except Exception as e: - return False, f"Failed to archive video: {str(e)}" + return False, f"Processing error: {str(e)}" finally: # Clean up downloaded file - try: - if file_path and os.path.exists(file_path): + if file_path and os.path.exists(file_path): + try: os.unlink(file_path) - except Exception as e: - logger.error(f"Failed to clean up file {file_path}: {e}") + except Exception as e: + logger.error(f"Failed to clean up file {file_path}: {e}") + # Don't fail the process for cleanup errors except Exception as e: logger.error(f"Error processing video: {traceback.format_exc()}") return False, str(e) + finally: + # Ensure file cleanup even on unexpected errors + if file_path and os.path.exists(file_path): + try: + os.unlink(file_path) + except Exception as e: + logger.error(f"Final cleanup failed for {file_path}: {e}") + async def cleanup(self): """Clean up resources""" try: diff --git a/videoarchiver/utils/video_downloader.py b/videoarchiver/utils/video_downloader.py index 8ce6b29..1077a5e 100644 --- a/videoarchiver/utils/video_downloader.py +++ b/videoarchiver/utils/video_downloader.py @@ -94,14 +94,14 @@ class VideoDownloader: self.active_downloads: Dict[str, str] = {} self._downloads_lock = asyncio.Lock() - # Configure yt-dlp options + # Configure yt-dlp options with improved settings self.ydl_opts = { - "format": f"bv*[height<={max_quality}][ext=mp4]+ba[ext=m4a]/b[height<={max_quality}]/best", # More flexible format - "outtmpl": "%(title)s.%(ext)s", # Base filename only, path added later + "format": f"bv*[height<={max_quality}][ext=mp4]+ba[ext=m4a]/b[height<={max_quality}]/best", + "outtmpl": "%(title)s.%(ext)s", "merge_output_format": video_format, - "quiet": True, # Reduce output noise - "no_warnings": True, # Reduce warning noise - "extract_flat": True, # Don't download video info + "quiet": True, + "no_warnings": True, + "extract_flat": True, "concurrent_fragment_downloads": concurrent_downloads, "retries": self.MAX_RETRIES, "fragment_retries": self.MAX_RETRIES, @@ -109,47 +109,42 @@ class VideoDownloader: "extractor_retries": self.MAX_RETRIES, "postprocessor_hooks": [self._check_file_size], "progress_hooks": [self._progress_hook], - "ffmpeg_location": str( - self.ffmpeg_mgr.get_ffmpeg_path() - ), # Convert Path to string - "ffprobe_location": str( - self.ffmpeg_mgr.get_ffprobe_path() - ), # Add ffprobe path - "paths": {"home": str(self.download_path)}, # Set home directory for yt-dlp - "logger": logger, # Use our logger - "ignoreerrors": True, # Don't stop on download errors - "no_color": True, # Disable ANSI colors in output - "geo_bypass": True, # Try to bypass geo-restrictions - "socket_timeout": 30, # Increase timeout + "ffmpeg_location": str(self.ffmpeg_mgr.get_ffmpeg_path()), + "ffprobe_location": str(self.ffmpeg_mgr.get_ffprobe_path()), + "paths": {"home": str(self.download_path)}, + "logger": logger, + "ignoreerrors": True, + "no_color": True, + "geo_bypass": True, + "socket_timeout": 30, + "http_chunk_size": 10485760, # 10MB chunks for better stability + "external_downloader_args": { + "ffmpeg": ["-timeout", "30000000"] # 30 second timeout + } } def is_supported_url(self, url: str) -> bool: """Check if URL is supported by attempting a simulated download""" - # First check if URL matches common video platform patterns if not is_video_url_pattern(url): return False try: - # Configure yt-dlp for simulation simulate_opts = { **self.ydl_opts, - "simulate": True, # Only simulate download - "quiet": True, # Reduce output noise + "simulate": True, + "quiet": True, "no_warnings": True, - "extract_flat": True, # Don't download video info - "skip_download": True, # Skip actual download - "format": "best", # Don't spend time finding best format + "extract_flat": True, + "skip_download": True, + "format": "best", } - # Create a new yt-dlp instance for simulation with yt_dlp.YoutubeDL(simulate_opts) as ydl: try: - # Try to extract info without downloading info = ydl.extract_info(url, download=False) if info is None: return False - # Check if site is enabled (if enabled_sites is configured) if self.enabled_sites: extractor = info.get("extractor", "").lower() if not any( @@ -164,7 +159,6 @@ class VideoDownloader: return True except yt_dlp.utils.UnsupportedError: - # Quietly handle unsupported URLs return False except Exception as e: if "Unsupported URL" not in str(e): @@ -175,6 +169,174 @@ class VideoDownloader: logger.error(f"Error during URL check: {str(e)}") return False + async def download_video(self, url: str) -> Tuple[bool, str, str]: + """Download and process a video with improved error handling and retry logic""" + original_file = None + compressed_file = None + temp_dir = None + hardware_accel_failed = False + compression_params = None + + try: + with temp_path_context() as temp_dir: + # Download the video + success, file_path, error = await self._safe_download(url, temp_dir) + if not success: + return False, "", error + + original_file = file_path + + async with self._downloads_lock: + self.active_downloads[url] = original_file + + # Check file size and compress if needed + file_size = os.path.getsize(original_file) + if file_size > (self.max_file_size * 1024 * 1024): + logger.info(f"Compressing video: {original_file}") + try: + # Get optimal compression parameters + compression_params = self.ffmpeg_mgr.get_compression_params( + original_file, self.max_file_size + ) + compressed_file = os.path.join( + self.download_path, + f"compressed_{os.path.basename(original_file)}", + ) + + # Try hardware acceleration first + success = await self._try_compression( + original_file, + compressed_file, + compression_params, + use_hardware=True + ) + + # If hardware acceleration fails, fall back to CPU + if not success: + hardware_accel_failed = True + logger.warning("Hardware acceleration failed, falling back to CPU encoding") + success = await self._try_compression( + original_file, + compressed_file, + compression_params, + use_hardware=False + ) + + if not success: + raise CompressionError( + "Failed to compress with both hardware and CPU encoding" + ) + + # Verify compressed file + if not self._verify_video_file(compressed_file): + raise VideoVerificationError( + "Compressed file verification failed" + ) + + compressed_size = os.path.getsize(compressed_file) + if compressed_size <= (self.max_file_size * 1024 * 1024): + await self._safe_delete_file(original_file) + return True, compressed_file, "" + else: + await self._safe_delete_file(compressed_file) + raise CompressionError( + "Failed to compress to target size", + input_size=file_size, + target_size=self.max_file_size * 1024 * 1024, + ) + + except Exception as e: + error_msg = str(e) + if hardware_accel_failed: + error_msg = f"Hardware acceleration failed, CPU fallback error: {error_msg}" + if compressed_file and os.path.exists(compressed_file): + await self._safe_delete_file(compressed_file) + return False, "", error_msg + + else: + # Move file to final location + final_path = os.path.join( + self.download_path, os.path.basename(original_file) + ) + success = await self._safe_move_file(original_file, final_path) + if not success: + return False, "", "Failed to move file to final location" + return True, final_path, "" + + except Exception as e: + logger.error(f"Download error: {str(e)}") + return False, "", str(e) + + finally: + # Clean up + async with self._downloads_lock: + self.active_downloads.pop(url, None) + + try: + if original_file and os.path.exists(original_file): + await self._safe_delete_file(original_file) + if ( + compressed_file + and os.path.exists(compressed_file) + and not compressed_file.startswith(self.download_path) + ): + await self._safe_delete_file(compressed_file) + except Exception as e: + logger.error(f"Error during file cleanup: {str(e)}") + + async def _try_compression( + self, + input_file: str, + output_file: str, + params: Dict[str, str], + use_hardware: bool = True + ) -> bool: + """Attempt video compression with given parameters""" + try: + # Build FFmpeg command + ffmpeg_path = str(self.ffmpeg_mgr.get_ffmpeg_path()) + cmd = [ffmpeg_path, "-y", "-i", input_file] + + # Modify parameters based on hardware acceleration preference + if use_hardware: + gpu_info = self.ffmpeg_mgr.gpu_info + if gpu_info["nvidia"] and params.get("c:v") == "libx264": + params["c:v"] = "h264_nvenc" + elif gpu_info["amd"] and params.get("c:v") == "libx264": + params["c:v"] = "h264_amf" + elif gpu_info["intel"] and params.get("c:v") == "libx264": + params["c:v"] = "h264_qsv" + else: + params["c:v"] = "libx264" + + # Add all parameters to command + for key, value in params.items(): + cmd.extend([f"-{key}", str(value)]) + + # Add output file + cmd.append(output_file) + + # Run compression + logger.debug(f"Running FFmpeg command: {' '.join(cmd)}") + result = await asyncio.get_event_loop().run_in_executor( + self.download_pool, + lambda: subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ), + ) + + return os.path.exists(output_file) + + except subprocess.CalledProcessError as e: + logger.error(f"FFmpeg compression failed: {e.stderr.decode()}") + return False + except Exception as e: + logger.error(f"Compression attempt failed: {str(e)}") + return False + def _check_file_size(self, info): """Check if file size is within limits""" if info.get("filepath") and os.path.exists(info["filepath"]): @@ -203,10 +365,7 @@ class VideoDownloader: def _verify_video_file(self, file_path: str) -> bool: """Verify video file integrity""" try: - # Use ffprobe from FFmpegManager ffprobe_path = str(self.ffmpeg_mgr.get_ffprobe_path()) - logger.debug(f"Using ffprobe from: {ffprobe_path}") - cmd = [ ffprobe_path, "-v", @@ -231,19 +390,19 @@ class VideoDownloader: probe = json.loads(result.stdout) - # Check if file has video stream + # Verify video stream video_streams = [s for s in probe["streams"] if s["codec_type"] == "video"] if not video_streams: raise VideoVerificationError("No video streams found") - # Check if duration is valid + # Verify duration duration = float(probe["format"].get("duration", 0)) if duration <= 0: raise VideoVerificationError("Invalid video duration") - # Check if file is readable + # Verify file is readable with open(file_path, "rb") as f: - f.seek(0, 2) # Seek to end + f.seek(0, 2) if f.tell() == 0: raise VideoVerificationError("Empty file") @@ -280,161 +439,10 @@ class VideoDownloader: except Exception as e: logger.error(f"Download attempt {attempt + 1} failed: {str(e)}") if attempt < self.MAX_RETRIES - 1: - await asyncio.sleep( - self.RETRY_DELAY * (attempt + 1) - ) # Exponential backoff + await asyncio.sleep(self.RETRY_DELAY * (attempt + 1)) else: return False, "", f"All download attempts failed: {str(e)}" - async def download_video(self, url: str) -> Tuple[bool, str, str]: - """Download and process a video""" - original_file = None - compressed_file = None - temp_dir = None - - try: - # Create temporary directory for download - with temp_path_context() as temp_dir: - # Download the video - success, file_path, error = await self._safe_download(url, temp_dir) - if not success: - return False, "", error - - original_file = file_path - - # Track this download - async with self._downloads_lock: - self.active_downloads[url] = original_file - - # Check file size and compress if needed - file_size = os.path.getsize(original_file) - if file_size > (self.max_file_size * 1024 * 1024): - logger.info(f"Compressing video: {original_file}") - try: - # Get optimal compression parameters - params = self.ffmpeg_mgr.get_compression_params( - original_file, self.max_file_size - ) - compressed_file = os.path.join( - self.download_path, - f"compressed_{os.path.basename(original_file)}", - ) - - # Build FFmpeg command with full path - ffmpeg_path = str(self.ffmpeg_mgr.get_ffmpeg_path()) - logger.debug(f"Using FFmpeg from: {ffmpeg_path}") - - # Build command with all parameters - cmd = [ffmpeg_path, "-y"] # Overwrite output file if it exists - - # Add input file - cmd.extend(["-i", original_file]) - - # Add all compression parameters - for key, value in params.items(): - if key == "c:v" and value == "libx264": - # Use hardware acceleration if available - gpu_info = self.ffmpeg_mgr.gpu_info - if gpu_info["nvidia"]: - cmd.extend(["-c:v", "h264_nvenc"]) - elif gpu_info["amd"]: - cmd.extend(["-c:v", "h264_amf"]) - elif gpu_info["intel"]: - cmd.extend(["-c:v", "h264_qsv"]) - else: - cmd.extend(["-c:v", "libx264"]) - else: - cmd.extend([f"-{key}", str(value)]) - - # Add output file - cmd.append(compressed_file) - - # Run compression in executor - logger.debug(f"Running FFmpeg command: {' '.join(cmd)}") - try: - result = await asyncio.get_event_loop().run_in_executor( - self.download_pool, - lambda: subprocess.run( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=True, - ), - ) - logger.debug(f"FFmpeg output: {result.stderr.decode()}") - except subprocess.CalledProcessError as e: - error = handle_ffmpeg_error(e.stderr.decode()) - logger.error(f"FFmpeg error: {e.stderr.decode()}") - raise error - - if not os.path.exists(compressed_file): - raise FileNotFoundError( - "Compression completed but file not found" - ) - - # Verify compressed file - if not self._verify_video_file(compressed_file): - raise VideoVerificationError( - "Compressed file is not a valid video" - ) - - compressed_size = os.path.getsize(compressed_file) - if compressed_size <= (self.max_file_size * 1024 * 1024): - await self._safe_delete_file(original_file) - return True, compressed_file, "" - else: - await self._safe_delete_file(compressed_file) - raise CompressionError( - "Failed to compress to target size", - input_size=file_size, - target_size=self.max_file_size * 1024 * 1024, - ) - except ( - FFmpegError, - VideoVerificationError, - FileNotFoundError, - CompressionError, - ) as e: - if compressed_file and os.path.exists(compressed_file): - await self._safe_delete_file(compressed_file) - return False, "", str(e) - except Exception as e: - if compressed_file and os.path.exists(compressed_file): - await self._safe_delete_file(compressed_file) - logger.error(f"Compression error: {str(e)}") - return False, "", f"Compression error: {str(e)}" - else: - # Move file to final location - final_path = os.path.join( - self.download_path, os.path.basename(original_file) - ) - # Use safe move with retries - success = await self._safe_move_file(original_file, final_path) - if not success: - return False, "", "Failed to move file to final location" - return True, final_path, "" - - except Exception as e: - logger.error(f"Download error: {str(e)}") - return False, "", str(e) - - finally: - # Clean up - async with self._downloads_lock: - self.active_downloads.pop(url, None) - - try: - if original_file and os.path.exists(original_file): - await self._safe_delete_file(original_file) - if ( - compressed_file - and os.path.exists(compressed_file) - and not compressed_file.startswith(self.download_path) - ): - await self._safe_delete_file(compressed_file) - except Exception as e: - logger.error(f"Error during file cleanup: {str(e)}") - async def _safe_delete_file(self, file_path: str) -> bool: """Safely delete a file with retries""" for attempt in range(self.FILE_OP_RETRIES): @@ -453,9 +461,7 @@ class VideoDownloader: """Safely move a file with retries""" for attempt in range(self.FILE_OP_RETRIES): try: - # Ensure destination directory exists os.makedirs(os.path.dirname(dst), exist_ok=True) - # Try to move the file shutil.move(src, dst) return True except Exception as e: