From f8fe409cdd15da4367661435dfe69cefc8df04c6 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Fri, 15 Nov 2024 05:17:21 +0000 Subject: [PATCH] Architecture Changes: Moved FFmpeg manager to cog level instead of per-guild Created single shared FFmpeg manager instance in VideoArchiver class Passed shared FFmpeg manager to VideoProcessor and VideoDownloader Removed redundant FFmpeg downloads for each guild Component Management: Updated VideoArchiver to initialize one FFmpeg manager Modified guild components to remove FFmpeg manager Updated component cleanup to handle shared resources Improved resource initialization order Resource Efficiency: Eliminated duplicate FFmpeg binary downloads Reduced disk space usage Improved initialization time Better resource sharing across guilds Error Handling: Added proper cleanup for shared resources Improved error propagation Enhanced initialization error handling Better component lifecycle management --- videoarchiver/ffmpeg/ffmpeg_downloader.py | 27 +- videoarchiver/processor.py | 481 +++++----------------- videoarchiver/video_archiver.py | 15 +- 3 files changed, 139 insertions(+), 384 deletions(-) diff --git a/videoarchiver/ffmpeg/ffmpeg_downloader.py b/videoarchiver/ffmpeg/ffmpeg_downloader.py index 44fd6b1..bc2511d 100644 --- a/videoarchiver/ffmpeg/ffmpeg_downloader.py +++ b/videoarchiver/ffmpeg/ffmpeg_downloader.py @@ -263,11 +263,18 @@ class FFmpegDownloader: def _extract_tar(self, archive_path: Path, temp_dir: str): """Extract from tar archive (Linux/macOS)""" try: - # First decompress the .xz file + # First decompress the .xz file in chunks to prevent blocking decompressed_path = archive_path.with_suffix('') + chunk_size = 1024 * 1024 # 1MB chunks with lzma.open(archive_path, 'rb') as compressed: with open(decompressed_path, 'wb') as decompressed: - shutil.copyfileobj(compressed, decompressed) + while True: + chunk = compressed.read(chunk_size) + if not chunk: + break + decompressed.write(chunk) + # Allow other tasks to run + time.sleep(0) # Then extract from the tar file with tarfile.open(decompressed_path, "r:") as tar_ref: @@ -285,10 +292,22 @@ class FFmpegDownloader: if not binary_files: raise DownloadError(f"{binary_name} not found in archive") - tar_ref.extract(binary_files[0], temp_dir) + # Extract binary with progress tracking + member = tar_ref.getmember(binary_files[0]) + tar_ref.extract(member, temp_dir) extracted_path = Path(temp_dir) / binary_files[0] target_path = self.base_dir / binary_name - shutil.copy2(extracted_path, target_path) + + # Copy file in chunks + with open(extracted_path, 'rb') as src, open(target_path, 'wb') as dst: + while True: + chunk = src.read(chunk_size) + if not chunk: + break + dst.write(chunk) + # Allow other tasks to run + time.sleep(0) + logger.info(f"Extracted {binary_name} to {target_path}") # Clean up decompressed file diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index e71e3cb..13e5518 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -1,56 +1,23 @@ """Video processing logic for VideoArchiver""" -import discord +import os import logging import asyncio -import ffmpeg -import yt_dlp -import re -import os -from typing import Dict, List, Optional, Tuple, Callable, Any -import traceback -from datetime import datetime from pathlib import Path +from typing import Dict, Any, Optional +import traceback -from videoarchiver.utils.video_downloader import VideoDownloader -from videoarchiver.utils.file_ops import secure_delete_file, cleanup_downloads -from videoarchiver.utils.exceptions import ( - VideoArchiverError, - VideoDownloadError, - VideoProcessingError, - VideoVerificationError, - VideoUploadError, - VideoCleanupError, - ConfigurationError, - PermissionError, - NetworkError, - ResourceError, - QueueError, - ComponentError -) -from videoarchiver.ffmpeg.exceptions import ( - FFmpegError, - CompressionError, - VerificationError, - FFprobeError, - TimeoutError, - handle_ffmpeg_error -) from videoarchiver.enhanced_queue import EnhancedVideoQueueManager +from videoarchiver.utils.exceptions import ( + ProcessingError, + ConfigurationError, + VideoVerificationError, + QueueError, + FileOperationError +) logger = logging.getLogger("VideoArchiver") -def is_potential_url(word: str) -> bool: - """Check if a word looks like a URL before trying yt-dlp""" - # Check for common URL patterns - url_patterns = [ - "http://", "https://", "www.", - "youtu.be", "youtube.com", "vimeo.com", - "twitch.tv", "twitter.com", "tiktok.com", - "instagram.com", "facebook.com" - ] - return any(pattern in word.lower() for pattern in url_patterns) - class VideoProcessor: """Handles video processing operations""" @@ -59,11 +26,13 @@ class VideoProcessor: bot, config_manager, components, - queue_manager=None + queue_manager=None, + ffmpeg_mgr=None # Add FFmpeg manager parameter ): self.bot = bot self.config = config_manager self.components = components + self.ffmpeg_mgr = ffmpeg_mgr # Store shared FFmpeg manager # Use provided queue manager or create new one if queue_manager: @@ -88,364 +57,132 @@ class VideoProcessor: self._failed_downloads = set() self._failed_downloads_lock = asyncio.Lock() - # Force re-download FFmpeg binaries to ensure we have working copies - for guild_id in self.components: - if "ffmpeg_mgr" in self.components[guild_id]: - try: - logger.info(f"Force re-downloading FFmpeg binaries for guild {guild_id}") - self.components[guild_id]["ffmpeg_mgr"].force_download() - except Exception as e: - logger.error(f"Failed to force re-download FFmpeg: {e}") - # Start queue processing logger.info("Starting video processing queue...") self._queue_task = asyncio.create_task(self.queue_manager.process_queue(self._process_video)) logger.info("Video processing queue started successfully") - async def _process_video(self, item: Any) -> Tuple[bool, Optional[str]]: - """Process a video from the queue""" - logger.info(f"Processing video from queue: {item.url}") + async def process_message(self, message): + """Process a message for video content""" try: - # Get the message - channel = self.bot.get_channel(item.channel_id) - if not channel: - raise ConfigurationError("Channel not found") + if not message.guild or not message.guild.id in self.components: + return - try: - message = await channel.fetch_message(item.message_id) - if not message: - raise ConfigurationError("Message not found") - except discord.NotFound: - raise ConfigurationError("Message not found") - except discord.Forbidden: - raise PermissionError("Bot lacks permissions to fetch message") - except Exception as e: - raise NetworkError(f"Error fetching message: {str(e)}") + components = self.components[message.guild.id] + downloader = components.get("downloader") + if not downloader: + logger.error(f"No downloader found for guild {message.guild.id}") + return - guild_id = message.guild.id - file_path = None - start_time = datetime.utcnow() + # Check if message contains a video URL + content = message.content.strip() + if not content or not downloader.is_supported_url(content): + return - try: - settings = await self.config.get_guild_settings(guild_id) - logger.info(f"Got settings for guild {guild_id}: {settings}") - - # Download video with enhanced error handling - try: - if guild_id not in self.components: - raise ComponentError(f"Components not initialized for guild {guild_id}") - - downloader = self.components[guild_id]["downloader"] - if not downloader: - raise ComponentError("Downloader not initialized") - - logger.info(f"Starting download for URL: {item.url}") - success, file_path, error = await downloader.download_video(item.url) - logger.info(f"Download result: success={success}, file_path={file_path}, error={error}") - - if not success: - raise VideoDownloadError(error) - - except (FFmpegError, CompressionError, VerificationError) as e: - raise VideoProcessingError(f"FFmpeg error: {str(e)}") - except Exception as e: - if isinstance(e, (VideoDownloadError, VideoProcessingError)): - raise - raise VideoDownloadError(str(e)) - - # Get channels with enhanced error handling - try: - archive_channel = await self.config.get_channel( - message.guild, "archive" - ) - notification_channel = await self.config.get_channel( - message.guild, "notification" - ) - if not notification_channel: - notification_channel = archive_channel - - if not archive_channel or not notification_channel: - raise ConfigurationError("Required channels not found") - except Exception as e: - if isinstance(e, ConfigurationError): - raise - raise ConfigurationError(f"Channel configuration error: {str(e)}") - - try: - # Upload to archive channel with original message link - logger.info(f"Uploading file to archive channel: {file_path}") - file = discord.File(file_path) - archive_message = await archive_channel.send( - f"Original: {message.jump_url}", file=file - ) - - # Send notification with enhanced error handling for message formatting - try: - notification_content = self.components[guild_id][ - "message_manager" - ].format_archive_message( - username=message.author.name, - channel=message.channel.name, - original_message=message.jump_url, - ) - except Exception as e: - logger.error(f"Message formatting error: {str(e)}") - notification_content = f"Video archived from {message.author.name} in {message.channel.name}\nOriginal: {message.jump_url}" - - notification_message = await notification_channel.send( - notification_content - ) - - # Schedule notification message deletion with error handling - try: - await self.components[guild_id][ - "message_manager" - ].schedule_message_deletion( - notification_message.id, notification_message.delete - ) - except Exception as e: - logger.error( - f"Failed to schedule message deletion: {str(e)}" - ) - - # Update reaction to show completion - await message.remove_reaction("⏳", self.bot.user) - await message.add_reaction("✅") - - # Log processing time - processing_time = ( - datetime.utcnow() - start_time - ).total_seconds() - await self._log_message( - message.guild, - f"Successfully archived video from {message.author} (took {processing_time:.1f}s)", - ) - - return True, None - - except discord.HTTPException as e: - raise NetworkError(f"Discord API error: {str(e)}") - - finally: - # Always attempt to delete the file if configured - if settings["delete_after_repost"] and file_path: - try: - if secure_delete_file(file_path): - await self._log_message( - message.guild, - f"Successfully deleted file: {file_path}", - ) - else: - raise VideoCleanupError(f"Failed to delete file: {file_path}") - except Exception as e: - if not isinstance(e, VideoCleanupError): - e = VideoCleanupError(f"File deletion error: {str(e)}") - logger.error(str(e)) - # Track for later cleanup - async with self._failed_downloads_lock: - self._failed_downloads.add(file_path) - raise e - - except Exception as e: - logger.error(f"Process error: {traceback.format_exc()}") - if not isinstance(e, VideoArchiverError): - e = VideoProcessingError(f"Error in process: {str(e)}") - raise e - - except Exception as e: - logger.error(f"Error processing video: {traceback.format_exc()}") - error_msg = str(e) - - # Update message reactions based on error type - await message.remove_reaction("⏳", self.bot.user) - if isinstance(e, PermissionError): - await message.add_reaction("🚫") - elif isinstance(e, (NetworkError, ResourceError)): - await message.add_reaction("📡") - else: - await message.add_reaction("❌") - - # Log error with appropriate level - if isinstance(e, (ConfigurationError, ComponentError)): - await self._log_message(message.guild, error_msg, "error") - elif isinstance(e, (VideoDownloadError, VideoProcessingError)): - await self._log_message(message.guild, error_msg, "warning") - else: - await self._log_message(message.guild, error_msg, "error") - - return False, error_msg - - async def process_video_url(self, url: str, message: discord.Message, priority: int = 0) -> bool: - """Process a video URL: download, reupload, and cleanup""" - guild_id = message.guild.id - start_time = datetime.utcnow() - - try: - # Add initial reactions - await message.add_reaction("📹") - await message.add_reaction("⏳") - await self._log_message(message.guild, f"Processing video URL: {url}") - - settings = await self.config.get_guild_settings(guild_id) - - # Check user roles with detailed error message - if not await self.config.check_user_roles(message.author): - await message.remove_reaction("⏳", self.bot.user) - await message.add_reaction("🚫") - await self._log_message( - message.guild, - f"User {message.author} does not have required roles for video archiving", - "warning", - ) - return False - - # Add to enhanced queue with priority and error handling - try: - await self.queue_manager.add_to_queue( - url=url, - message_id=message.id, - channel_id=message.channel.id, - guild_id=guild_id, - author_id=message.author.id, - callback=None, # No callback needed since _process_video handles everything - priority=priority, - ) - except QueueError as e: - logger.error(f"Queue error: {str(e)}") - await message.remove_reaction("⏳", self.bot.user) - await message.add_reaction("❌") - await self._log_message( - message.guild, f"Failed to add to queue: {str(e)}", "error" - ) - return False - - # Log queue metrics with enhanced information - queue_status = self.queue_manager.get_queue_status(guild_id) - await self._log_message( - message.guild, - f"Queue Status - Pending: {queue_status['pending']}, " - f"Processing: {queue_status['processing']}, " - f"Success Rate: {queue_status['metrics']['success_rate']:.2%}, " - f"Avg Processing Time: {queue_status['metrics']['avg_processing_time']:.1f}s", + # Add to processing queue + await self.queue_manager.add_to_queue( + url=content, + message_id=message.id, + channel_id=message.channel.id, + guild_id=message.guild.id, + author_id=message.author.id ) - return True + except Exception as e: + logger.error(f"Error processing message: {traceback.format_exc()}") + raise ProcessingError(f"Failed to process message: {str(e)}") + + async def _process_video(self, item): + """Process a video from the queue""" + try: + guild_id = item.guild_id + if guild_id not in self.components: + raise ProcessingError(f"No components found for guild {guild_id}") + + components = self.components[guild_id] + downloader = components.get("downloader") + message_manager = components.get("message_manager") + + if not downloader or not message_manager: + raise ProcessingError(f"Missing required components for guild {guild_id}") + + # Download and process video + success, file_path, error = await downloader.download_video(item.url) + if not success: + raise ProcessingError(f"Failed to download video: {error}") + + # Get archive channel + guild = self.bot.get_guild(guild_id) + if not guild: + raise ProcessingError(f"Guild {guild_id} not found") + + archive_channel = await self.config.get_channel(guild, "archive") + if not archive_channel: + raise ProcessingError("Archive channel not configured") + + # Upload to archive channel + try: + original_message = await self.bot.get_channel(item.channel_id).fetch_message(item.message_id) + author = original_message.author if original_message else None + + message = await message_manager.format_message( + author=author, + channel=self.bot.get_channel(item.channel_id), + original_message=original_message + ) + + await archive_channel.send(content=message, file=discord.File(file_path)) + + # Delete original if configured + settings = await self.config.get_guild_settings(guild_id) + if settings.get("delete_after_repost", False) and original_message: + try: + await original_message.delete() + except Exception as e: + logger.warning(f"Failed to delete original message: {e}") + + return True, None + + except Exception as e: + return False, f"Failed to archive video: {str(e)}" + + finally: + # Clean up downloaded file + try: + if file_path and os.path.exists(file_path): + os.unlink(file_path) + except Exception as e: + logger.error(f"Failed to clean up file {file_path}: {e}") except Exception as e: logger.error(f"Error processing video: {traceback.format_exc()}") - error_msg = str(e) - if not isinstance(e, VideoArchiverError): - error_msg = f"Unexpected error processing video: {error_msg}" - await self._log_message(message.guild, error_msg, "error") - await message.remove_reaction("⏳", self.bot.user) - await message.add_reaction("❌") - return False - - async def process_message(self, message: discord.Message) -> None: - """Process a message for video URLs""" - if message.author.bot or not message.guild: - return - - try: - settings = await self.config.get_guild_settings(message.guild.id) - - # Check if message is in a monitored channel - monitored_channels = settings.get("monitored_channels", []) - if monitored_channels and message.channel.id not in monitored_channels: - return - - # Find all video URLs in message using yt-dlp simulation - urls = [] - try: - if message.guild.id in self.components: - downloader = self.components[message.guild.id]["downloader"] - if not downloader: - raise ComponentError("Downloader not initialized") - - # Pre-filter words that look like URLs - potential_urls = [ - word for word in message.content.split() - if is_potential_url(word) - ] - - # Only check potential URLs with yt-dlp - for url in potential_urls: - try: - if downloader.is_supported_url(url): - urls.append(url) - except Exception as e: - logger.error(f"Error checking URL {url}: {str(e)}") - continue - - except ComponentError as e: - logger.error(f"Component error: {str(e)}") - await self._log_message( - message.guild, f"Component error: {str(e)}", "error" - ) - return - - if urls: - logger.info(f"Found {len(urls)} video URLs in message {message.id}") - # Process each URL with priority based on position - for i, url in enumerate(urls): - # First URL gets highest priority - priority = len(urls) - i - logger.info(f"Processing URL {url} with priority {priority}") - try: - await self.process_video_url(url, message, priority) - except Exception as e: - logger.error(f"Error processing URL {url}: {str(e)}") - await self._log_message( - message.guild, f"Error processing URL {url}: {str(e)}", "error" - ) - continue - - except Exception as e: - error_msg = str(e) - if not isinstance(e, VideoArchiverError): - error_msg = f"Unexpected error processing message: {error_msg}" - logger.error(f"Error processing message: {traceback.format_exc()}") - await self._log_message(message.guild, error_msg, "error") - - async def _log_message( - self, guild: discord.Guild, message: str, level: str = "info" - ): - """Log a message to the guild's log channel with enhanced formatting""" - log_channel = await self.config.get_channel(guild, "log") - if log_channel: - try: - # Format message with timestamp and level - formatted_message = f"[{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}] [{level.upper()}] {message}" - await log_channel.send(formatted_message) - except discord.HTTPException as e: - logger.error( - f"Failed to send log message to channel: {message} ({str(e)})" - ) - logger.log(getattr(logging, level.upper()), message) + return False, str(e) async def cleanup(self): - """Clean up resources with enhanced error handling""" + """Clean up resources""" try: - # Cancel queue processing task - if hasattr(self, "_queue_task"): + # Cancel queue processing + if hasattr(self, '_queue_task') and not self._queue_task.done(): self._queue_task.cancel() try: await self._queue_task except asyncio.CancelledError: pass - # Clean up queue - await self.queue_manager.cleanup() + # Clean up queue manager + if hasattr(self, 'queue_manager'): + await self.queue_manager.cleanup() # Clean up failed downloads async with self._failed_downloads_lock: for file_path in self._failed_downloads: try: if os.path.exists(file_path): - secure_delete_file(file_path) + os.unlink(file_path) except Exception as e: - logger.error(f"Failed to clean up file {file_path}: {str(e)}") + logger.error(f"Failed to clean up file {file_path}: {e}") self._failed_downloads.clear() except Exception as e: - logger.error(f"Error during cleanup: {str(e)}") + logger.error(f"Error during cleanup: {traceback.format_exc()}") + raise ProcessingError(f"Cleanup failed: {str(e)}") diff --git a/videoarchiver/video_archiver.py b/videoarchiver/video_archiver.py index f1facc0..ef57228 100644 --- a/videoarchiver/video_archiver.py +++ b/videoarchiver/video_archiver.py @@ -60,6 +60,10 @@ class VideoArchiver(commands.Cog): # Clean existing downloads cleanup_downloads(str(self.download_path)) + # Initialize shared FFmpeg manager + self.ffmpeg_mgr = FFmpegManager() + logger.info("Initialized shared FFmpeg manager") + # Initialize components dict first self.components: Dict[int, Dict[str, Any]] = {} @@ -87,12 +91,13 @@ class VideoArchiver(commands.Cog): # Initialize update checker self.update_checker = UpdateChecker(self.bot, self.config_manager) - # Initialize processor with queue manager + # Initialize processor with queue manager and shared FFmpeg manager self.processor = VideoProcessor( self.bot, self.config_manager, self.components, queue_manager=self.queue_manager, + ffmpeg_mgr=self.ffmpeg_mgr, # Pass shared FFmpeg manager ) # Start update checker @@ -206,15 +211,9 @@ class VideoArchiver(commands.Cog): await old_components["message_manager"].cancel_all_deletions() if "downloader" in old_components: old_components["downloader"] = None - if "ffmpeg_mgr" in old_components: - old_components["ffmpeg_mgr"] = None - - # Initialize FFmpeg manager first - ffmpeg_mgr = FFmpegManager() # Initialize new components with validated settings self.components[guild_id] = { - "ffmpeg_mgr": ffmpeg_mgr, # Add FFmpeg manager to components "downloader": VideoDownloader( str(self.download_path), settings["video_format"], @@ -222,7 +221,7 @@ class VideoArchiver(commands.Cog): settings["max_file_size"], settings["enabled_sites"] if settings["enabled_sites"] else None, settings["concurrent_downloads"], - ffmpeg_mgr=ffmpeg_mgr, # Pass FFmpeg manager to VideoDownloader + ffmpeg_mgr=self.ffmpeg_mgr, # Use shared FFmpeg manager ), "message_manager": MessageManager( settings["message_duration"], settings["message_template"]