From 971d52bd00b3666cd30ec18c23f9e4259f6958ab Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Fri, 15 Nov 2024 18:58:05 +0000 Subject: [PATCH] Separation of Concerns: Core processing logic in VideoProcessor class Dedicated message handling in MessageHandler Queue operations in QueueHandler Progress tracking in ProgressTracker Reaction management in reactions.py Improved Maintainability: Each component has a single responsibility Easier to test individual components Better code organization and readability Reduced file sizes for better version control Better Resource Management: Centralized progress tracking Isolated queue operations Cleaner cleanup processes Optimized Imports: Components can be imported individually as needed Main processor.py provides backward compatibility Clear module interface through init.py --- videoarchiver/processor.py | 629 +------------------- videoarchiver/processor/__init__.py | 15 + videoarchiver/processor/core.py | 224 +++++++ videoarchiver/processor/message_handler.py | 116 ++++ videoarchiver/processor/progress_tracker.py | 88 +++ videoarchiver/processor/queue_handler.py | 273 +++++++++ videoarchiver/processor/reactions.py | 97 +++ 7 files changed, 827 insertions(+), 615 deletions(-) create mode 100644 videoarchiver/processor/__init__.py create mode 100644 videoarchiver/processor/core.py create mode 100644 videoarchiver/processor/message_handler.py create mode 100644 videoarchiver/processor/progress_tracker.py create mode 100644 videoarchiver/processor/queue_handler.py create mode 100644 videoarchiver/processor/reactions.py diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index 4c90b50..ffc1349 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -1,618 +1,17 @@ -"""Video processing logic for VideoArchiver""" +"""Re-export video processing components from processor module""" -import os -import logging -import asyncio -import discord -from discord.ext import commands -from discord import app_commands -from pathlib import Path -from typing import Dict, List, Optional, Tuple, Callable, Set -import traceback -from datetime import datetime - -from videoarchiver.queue import EnhancedVideoQueueManager # Updated import -from videoarchiver.utils.exceptions import ( - ProcessingError, - ConfigurationError, - VideoVerificationError, - QueueError, - FileOperationError +from videoarchiver.processor import ( + VideoProcessor, + REACTIONS, + ProgressTracker, + MessageHandler, + QueueHandler ) -from videoarchiver.utils.video_downloader import is_video_url_pattern -logger = logging.getLogger("VideoArchiver") - -# Reaction emojis -REACTIONS = { - 'queued': '📹', - 'processing': '⚙️', - 'success': '✅', - 'error': '❌', - 'numbers': ['1️⃣', '2️⃣', '3️⃣', '4️⃣', '5️⃣'], - 'progress': ['⬛', '🟨', '🟩'], - 'download': ['0️⃣', '2️⃣', '4️⃣', '6️⃣', '8️⃣', '🔟'] -} - -# Global queue manager instance to persist across reloads -_global_queue_manager = None - -# Track detailed progress information -_download_progress: Dict[str, Dict[str, Any]] = {} -_compression_progress: Dict[str, Dict[str, Any]] = {} - -class VideoProcessor: - """Handles video processing operations""" - - def __init__( - self, - bot, - config_manager, - components, - queue_manager=None, - ffmpeg_mgr=None - ): - self.bot = bot - self.config = config_manager - self.components = components - self.ffmpeg_mgr = ffmpeg_mgr - - # Track active downloads and their tasks - self._active_downloads: Dict[str, asyncio.Task] = {} - self._active_downloads_lock = asyncio.Lock() - self._unloading = False - - # Use global queue manager if available - global _global_queue_manager - if _global_queue_manager is not None: - self.queue_manager = _global_queue_manager - logger.info("Using existing global queue manager") - elif queue_manager: - self.queue_manager = queue_manager - _global_queue_manager = queue_manager - logger.info("Using provided queue manager and setting as global") - else: - data_dir = Path(os.path.dirname(__file__)) / "data" - data_dir.mkdir(parents=True, exist_ok=True) - queue_path = data_dir / "queue_state.json" - - self.queue_manager = EnhancedVideoQueueManager( - max_retries=3, - retry_delay=5, - max_queue_size=1000, - cleanup_interval=1800, - max_history_age=86400, - persistence_path=str(queue_path) - ) - _global_queue_manager = self.queue_manager - logger.info("Created new queue manager and set as global") - - # Start queue processing - logger.info("Starting video processing queue...") - self._queue_task = asyncio.create_task(self.queue_manager.process_queue(self._process_video)) - logger.info("Video processing queue started successfully") - - async def cleanup(self): - """Clean up resources and stop processing""" - try: - logger.info("Starting VideoProcessor cleanup...") - self._unloading = True - - # Cancel all active downloads - async with self._active_downloads_lock: - for url, task in list(self._active_downloads.items()): - if not task.done(): - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - except Exception as e: - logger.error(f"Error cancelling download task for {url}: {e}") - self._active_downloads.clear() - - # Clean up queue manager - if hasattr(self, 'queue_manager'): - try: - await self.queue_manager.cleanup() - except Exception as e: - logger.error(f"Error cleaning up queue manager: {e}") - - # Clean up FFmpeg manager - if self.ffmpeg_mgr: - try: - self.ffmpeg_mgr.kill_all_processes() - except Exception as e: - logger.error(f"Error cleaning up FFmpeg manager: {e}") - - # Cancel queue processing task - if hasattr(self, '_queue_task') and not self._queue_task.done(): - self._queue_task.cancel() - try: - await self._queue_task - except asyncio.CancelledError: - pass - except Exception as e: - logger.error(f"Error cancelling queue task: {e}") - - logger.info("VideoProcessor cleanup completed successfully") - - except Exception as e: - logger.error(f"Error during VideoProcessor cleanup: {traceback.format_exc()}") - raise ProcessingError(f"Cleanup failed: {str(e)}") - - async def force_cleanup(self): - """Force cleanup of resources when normal cleanup fails or times out""" - try: - logger.info("Starting force cleanup of VideoProcessor...") - self._unloading = True - - # Force cancel all active downloads - for url, task in list(self._active_downloads.items()): - if not task.done(): - task.cancel() - self._active_downloads.clear() - - # Force cleanup queue manager - if hasattr(self, 'queue_manager'): - try: - self.queue_manager.force_stop() - except Exception as e: - logger.error(f"Error force stopping queue manager: {e}") - - # Force cleanup FFmpeg - if self.ffmpeg_mgr: - try: - self.ffmpeg_mgr.kill_all_processes() - except Exception as e: - logger.error(f"Error force cleaning FFmpeg manager: {e}") - - # Force cancel queue task - if hasattr(self, '_queue_task') and not self._queue_task.done(): - self._queue_task.cancel() - - logger.info("VideoProcessor force cleanup completed") - - except Exception as e: - logger.error(f"Error during VideoProcessor force cleanup: {traceback.format_exc()}") - # Don't raise here as this is the last resort cleanup - - async def process_message(self, message: discord.Message) -> None: - """Process a message for video content""" - try: - # Check if message contains any video URLs - if not message.content and not message.attachments: - logger.debug(f"No content or attachments in message {message.id}") - return - - # Get guild settings - settings = await self.config.get_guild_settings(message.guild.id) - if not settings: - logger.warning(f"No settings found for guild {message.guild.id}") - return - - # Log settings for debugging - logger.debug(f"Guild {message.guild.id} settings: {settings}") - - # Check if channel is enabled - enabled_channels = settings.get("enabled_channels", []) - if enabled_channels and message.channel.id not in enabled_channels: - logger.debug(f"Channel {message.channel.id} not in enabled channels: {enabled_channels}") - return - - # Extract URLs from message content and attachments - urls = [] - if message.content: - # Log message content for debugging - logger.debug(f"Processing message content: {message.content}") - enabled_sites = settings.get("enabled_sites", []) - logger.debug(f"Enabled sites: {enabled_sites}") - - # Add URLs from message content - for word in message.content.split(): - # Log each word being checked - logger.debug(f"Checking word: {word}") - # Use proper video URL validation - if is_video_url_pattern(word): - # If no sites are enabled, accept all URLs - # Otherwise, check if URL contains any enabled site - if not enabled_sites or any(site in word.lower() for site in enabled_sites): - logger.debug(f"Found matching URL: {word}") - urls.append(word) - else: - logger.debug(f"URL {word} doesn't match any enabled sites") - else: - logger.debug(f"Word {word} is not a valid video URL") - - # Add attachment URLs - for attachment in message.attachments: - logger.debug(f"Checking attachment: {attachment.filename}") - if any(attachment.filename.lower().endswith(ext) for ext in ['.mp4', '.mov', '.avi', '.webm']): - logger.debug(f"Found video attachment: {attachment.url}") - urls.append(attachment.url) - - if not urls: - logger.debug("No valid URLs found in message") - return - - # Add each URL to the queue - for url in urls: - try: - logger.info(f"Adding URL to queue: {url}") - await message.add_reaction(REACTIONS['queued']) - await self.queue_manager.add_to_queue( - url=url, - message_id=message.id, - channel_id=message.channel.id, - guild_id=message.guild.id, - author_id=message.author.id, - priority=0 - ) - logger.info(f"Successfully added video to queue: {url}") - except QueueError as e: - logger.error(f"Failed to add video to queue: {str(e)}") - await message.add_reaction(REACTIONS['error']) - continue - - except Exception as e: - logger.error(f"Error processing message: {traceback.format_exc()}") - try: - await message.add_reaction(REACTIONS["error"]) - except: - pass - - async def _process_video(self, item) -> Tuple[bool, Optional[str]]: - """Process a video from the queue""" - if self._unloading: - return False, "Processor is unloading" - - file_path = None - original_message = None - download_task = None - - try: - guild_id = item.guild_id - if guild_id not in self.components: - return False, f"No components found for guild {guild_id}" - - components = self.components[guild_id] - downloader = components.get("downloader") - message_manager = components.get("message_manager") - - if not downloader or not message_manager: - return False, f"Missing required components for guild {guild_id}" - - try: - channel = self.bot.get_channel(item.channel_id) - if not channel: - return False, f"Channel {item.channel_id} not found" - original_message = await channel.fetch_message(item.message_id) - - await original_message.remove_reaction( - REACTIONS["queued"], self.bot.user - ) - await original_message.add_reaction(REACTIONS["processing"]) - logger.info(f"Started processing message {item.message_id}") - except discord.NotFound: - original_message = None - except Exception as e: - logger.error(f"Error fetching original message: {e}") - original_message = None - - # Create progress callback that creates tasks directly - def progress_callback(progress: float) -> None: - if original_message: - try: - # Try to get the current event loop - try: - loop = asyncio.get_running_loop() - except RuntimeError: - # If no event loop is running in this thread, - # we'll use the bot's loop which we know exists - loop = self.bot.loop - - if not loop.is_running(): - logger.warning( - "Event loop is not running, skipping progress update" - ) - return - - # Create a task to update the reaction - asyncio.run_coroutine_threadsafe( - self.update_download_progress_reaction( - original_message, progress - ), - loop, - ) - except Exception as e: - logger.error(f"Error in progress callback: {e}") - - # Create and track download task - download_task = asyncio.create_task( - downloader.download_video(item.url, progress_callback=progress_callback) - ) - - async with self._active_downloads_lock: - self._active_downloads[item.url] = download_task - - try: - success, file_path, error = await download_task - if not success: - if original_message: - await original_message.add_reaction(REACTIONS["error"]) - logger.error( - f"Download failed for message {item.message_id}: {error}" - ) - return False, f"Failed to download video: {error}" - except asyncio.CancelledError: - logger.info(f"Download cancelled for {item.url}") - return False, "Download cancelled" - except Exception as e: - if original_message: - await original_message.add_reaction(REACTIONS["error"]) - logger.error( - f"Download error for message {item.message_id}: {str(e)}" - ) - return False, f"Download error: {str(e)}" - finally: - async with self._active_downloads_lock: - self._active_downloads.pop(item.url, None) - - # Get archive channel - guild = self.bot.get_guild(guild_id) - if not guild: - return False, f"Guild {guild_id} not found" - - archive_channel = await self.config.get_channel(guild, "archive") - if not archive_channel: - return False, "Archive channel not configured" - - # Format message - try: - author = original_message.author if original_message else None - message = await message_manager.format_message( - author=author, channel=channel, url=item.url - ) - except Exception as e: - return False, f"Failed to format message: {str(e)}" - - # Upload to archive channel - try: - if not os.path.exists(file_path): - return False, "Processed file not found" - - await archive_channel.send( - content=message, file=discord.File(file_path) - ) - - if original_message: - await original_message.remove_reaction( - REACTIONS["processing"], self.bot.user - ) - await original_message.add_reaction(REACTIONS["success"]) - logger.info(f"Successfully processed message {item.message_id}") - - return True, None - - except discord.HTTPException as e: - if original_message: - await original_message.add_reaction(REACTIONS["error"]) - logger.error( - f"Failed to upload to Discord for message {item.message_id}: {str(e)}" - ) - return False, f"Failed to upload to Discord: {str(e)}" - except Exception as e: - if original_message: - await original_message.add_reaction(REACTIONS["error"]) - logger.error( - f"Failed to archive video for message {item.message_id}: {str(e)}" - ) - return False, f"Failed to archive video: {str(e)}" - - except Exception as e: - logger.error(f"Error processing video: {traceback.format_exc()}") - return False, str(e) - finally: - # Clean up downloaded file - if file_path and os.path.exists(file_path): - try: - os.unlink(file_path) - except Exception as e: - logger.error(f"Failed to clean up file {file_path}: {e}") - - async def update_queue_position_reaction(self, message, position): - """Update queue position reaction""" - try: - for reaction in REACTIONS["numbers"]: - try: - await message.remove_reaction(reaction, self.bot.user) - except: - pass - - if 0 <= position < len(REACTIONS["numbers"]): - await message.add_reaction(REACTIONS["numbers"][position]) - logger.info( - f"Updated queue position reaction to {position + 1} for message {message.id}" - ) - except Exception as e: - logger.error(f"Failed to update queue position reaction: {e}") - - async def update_progress_reaction(self, message, progress): - """Update progress reaction based on FFmpeg progress""" - if not message: - return - - try: - # Get event loop for the current context - loop = asyncio.get_running_loop() - - # Remove old reactions in the event loop - for reaction in REACTIONS["progress"]: - try: - await message.remove_reaction(reaction, self.bot.user) - except Exception as e: - logger.error(f"Failed to remove progress reaction: {e}") - continue - - # Add new reaction based on progress - try: - if progress < 33: - await message.add_reaction(REACTIONS["progress"][0]) - elif progress < 66: - await message.add_reaction(REACTIONS["progress"][1]) - else: - await message.add_reaction(REACTIONS["progress"][2]) - except Exception as e: - logger.error(f"Failed to add progress reaction: {e}") - - except Exception as e: - logger.error(f"Failed to update progress reaction: {e}") - - async def update_download_progress_reaction(self, message, progress): - """Update download progress reaction""" - if not message: - return - - try: - # Remove old reactions in the event loop - for reaction in REACTIONS["download"]: - try: - await message.remove_reaction(reaction, self.bot.user) - except Exception as e: - logger.error(f"Failed to remove download reaction: {e}") - continue - - # Add new reaction based on progress - try: - if progress <= 20: - await message.add_reaction(REACTIONS["download"][0]) - elif progress <= 40: - await message.add_reaction(REACTIONS["download"][1]) - elif progress <= 60: - await message.add_reaction(REACTIONS["download"][2]) - elif progress <= 80: - await message.add_reaction(REACTIONS["download"][3]) - elif progress < 100: - await message.add_reaction(REACTIONS["download"][4]) - else: - await message.add_reaction(REACTIONS["download"][5]) - except Exception as e: - logger.error(f"Failed to add download reaction: {e}") - - except Exception as e: - logger.error(f"Failed to update download progress reaction: {e}") - - async def _show_queue_details(self, ctx): - """Display detailed queue status and progress information""" - try: - # Get queue status - queue_status = self.queue_manager.get_queue_status(ctx.guild.id) - - # Create embed for queue overview - embed = discord.Embed( - title="Queue Status Details", - color=discord.Color.blue(), - timestamp=datetime.utcnow(), - ) - - # Queue statistics - embed.add_field( - name="Queue Statistics", - value=f"```\n" - f"Pending: {queue_status['pending']}\n" - f"Processing: {queue_status['processing']}\n" - f"Completed: {queue_status['completed']}\n" - f"Failed: {queue_status['failed']}\n" - f"Success Rate: {queue_status['metrics']['success_rate']:.1%}\n" - f"Avg Processing Time: {queue_status['metrics']['avg_processing_time']:.1f}s\n" - f"```", - inline=False, - ) - - # Active downloads - active_downloads = "" - for url, progress in _download_progress.items(): - if progress.get("active", False): - active_downloads += ( - f"URL: {url[:50]}...\n" - f"Progress: {progress.get('percent', 0):.1f}%\n" - f"Speed: {progress.get('speed', 'N/A')}\n" - f"ETA: {progress.get('eta', 'N/A')}\n" - f"Size: {progress.get('downloaded_bytes', 0)}/{progress.get('total_bytes', 0)} bytes\n" - f"Started: {progress.get('start_time', 'N/A')}\n" - f"Retries: {progress.get('retries', 0)}\n" - f"-------------------\n" - ) - - if active_downloads: - embed.add_field( - name="Active Downloads", - value=f"```\n{active_downloads}```", - inline=False, - ) - else: - embed.add_field( - name="Active Downloads", - value="```\nNo active downloads```", - inline=False, - ) - - # Active compressions - active_compressions = "" - for url, progress in _compression_progress.items(): - if progress.get("active", False): - active_compressions += ( - f"File: {progress.get('filename', 'Unknown')}\n" - f"Progress: {progress.get('percent', 0):.1f}%\n" - f"Time Elapsed: {progress.get('elapsed_time', 'N/A')}\n" - f"Input Size: {progress.get('input_size', 0)} bytes\n" - f"Current Size: {progress.get('current_size', 0)} bytes\n" - f"Target Size: {progress.get('target_size', 0)} bytes\n" - f"Codec: {progress.get('codec', 'Unknown')}\n" - f"Hardware Accel: {progress.get('hardware_accel', False)}\n" - f"-------------------\n" - ) - - if active_compressions: - embed.add_field( - name="Active Compressions", - value=f"```\n{active_compressions}```", - inline=False, - ) - else: - embed.add_field( - name="Active Compressions", - value="```\nNo active compressions```", - inline=False, - ) - - # Error statistics - if queue_status["metrics"]["errors_by_type"]: - error_stats = "\n".join( - f"{error_type}: {count}" - for error_type, count in queue_status["metrics"][ - "errors_by_type" - ].items() - ) - embed.add_field( - name="Error Statistics", - value=f"```\n{error_stats}```", - inline=False, - ) - - # Hardware acceleration statistics - embed.add_field( - name="Hardware Statistics", - value=f"```\n" - f"Hardware Accel Failures: {queue_status['metrics']['hardware_accel_failures']}\n" - f"Compression Failures: {queue_status['metrics']['compression_failures']}\n" - f"Peak Memory Usage: {queue_status['metrics']['peak_memory_usage']:.1f}MB\n" - f"```", - inline=False, - ) - - await ctx.send(embed=embed) - - except Exception as e: - logger.error(f"Error showing queue details: {traceback.format_exc()}") - await ctx.send(f"Error getting queue details: {str(e)}") +__all__ = [ + 'VideoProcessor', + 'REACTIONS', + 'ProgressTracker', + 'MessageHandler', + 'QueueHandler' +] diff --git a/videoarchiver/processor/__init__.py b/videoarchiver/processor/__init__.py new file mode 100644 index 0000000..a82915d --- /dev/null +++ b/videoarchiver/processor/__init__.py @@ -0,0 +1,15 @@ +"""Video processing module for VideoArchiver""" + +from .core import VideoProcessor +from .reactions import REACTIONS +from .progress_tracker import ProgressTracker +from .message_handler import MessageHandler +from .queue_handler import QueueHandler + +__all__ = [ + 'VideoProcessor', + 'REACTIONS', + 'ProgressTracker', + 'MessageHandler', + 'QueueHandler' +] diff --git a/videoarchiver/processor/core.py b/videoarchiver/processor/core.py new file mode 100644 index 0000000..bf8a540 --- /dev/null +++ b/videoarchiver/processor/core.py @@ -0,0 +1,224 @@ +"""Core VideoProcessor class that manages video processing operations""" + +import logging +import discord +from discord.ext import commands +from discord import app_commands +from datetime import datetime +from typing import Dict, Any, Optional + +from .message_handler import MessageHandler +from .queue_handler import QueueHandler +from .progress_tracker import ProgressTracker +from .reactions import REACTIONS + +logger = logging.getLogger("VideoArchiver") + +class VideoProcessor: + """Handles video processing operations""" + + def __init__( + self, + bot, + config_manager, + components, + queue_manager=None, + ffmpeg_mgr=None + ): + self.bot = bot + self.config = config_manager + self.components = components + self.ffmpeg_mgr = ffmpeg_mgr + + # Initialize handlers + self.queue_handler = QueueHandler(bot, config_manager, components) + self.message_handler = MessageHandler(bot, config_manager, queue_manager) + self.progress_tracker = ProgressTracker() + + # Start queue processing + logger.info("Starting video processing queue...") + self._queue_task = None + if queue_manager: + self._queue_task = self.bot.loop.create_task( + queue_manager.process_queue(self.queue_handler.process_video) + ) + logger.info("Video processing queue started successfully") + + async def process_message(self, message: discord.Message) -> None: + """Process a message for video content""" + await self.message_handler.process_message(message) + + async def cleanup(self): + """Clean up resources and stop processing""" + try: + logger.info("Starting VideoProcessor cleanup...") + + # Clean up queue handler + try: + await self.queue_handler.cleanup() + except Exception as e: + logger.error(f"Error cleaning up queue handler: {e}") + + # Clean up FFmpeg manager + if self.ffmpeg_mgr: + try: + self.ffmpeg_mgr.kill_all_processes() + except Exception as e: + logger.error(f"Error cleaning up FFmpeg manager: {e}") + + # Cancel queue processing task + if self._queue_task and not self._queue_task.done(): + self._queue_task.cancel() + try: + await self._queue_task + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error cancelling queue task: {e}") + + logger.info("VideoProcessor cleanup completed successfully") + + except Exception as e: + logger.error(f"Error during VideoProcessor cleanup: {str(e)}", exc_info=True) + raise + + async def force_cleanup(self): + """Force cleanup of resources when normal cleanup fails""" + try: + logger.info("Starting force cleanup of VideoProcessor...") + + # Force cleanup queue handler + try: + await self.queue_handler.force_cleanup() + except Exception as e: + logger.error(f"Error force cleaning queue handler: {e}") + + # Force cleanup FFmpeg + if self.ffmpeg_mgr: + try: + self.ffmpeg_mgr.kill_all_processes() + except Exception as e: + logger.error(f"Error force cleaning FFmpeg manager: {e}") + + # Force cancel queue task + if self._queue_task and not self._queue_task.done(): + self._queue_task.cancel() + + logger.info("VideoProcessor force cleanup completed") + + except Exception as e: + logger.error(f"Error during VideoProcessor force cleanup: {str(e)}", exc_info=True) + + async def show_queue_details(self, ctx: commands.Context): + """Display detailed queue status and progress information""" + try: + # Get queue status + queue_status = self.queue_manager.get_queue_status(ctx.guild.id) + + # Create embed for queue overview + embed = discord.Embed( + title="Queue Status Details", + color=discord.Color.blue(), + timestamp=datetime.utcnow(), + ) + + # Queue statistics + embed.add_field( + name="Queue Statistics", + value=f"```\n" + f"Pending: {queue_status['pending']}\n" + f"Processing: {queue_status['processing']}\n" + f"Completed: {queue_status['completed']}\n" + f"Failed: {queue_status['failed']}\n" + f"Success Rate: {queue_status['metrics']['success_rate']:.1%}\n" + f"Avg Processing Time: {queue_status['metrics']['avg_processing_time']:.1f}s\n" + f"```", + inline=False, + ) + + # Active operations + active_ops = self.progress_tracker.get_active_operations() + + # Active downloads + downloads = active_ops['downloads'] + if downloads: + active_downloads = "" + for url, progress in downloads.items(): + active_downloads += ( + f"URL: {url[:50]}...\n" + f"Progress: {progress.get('percent', 0):.1f}%\n" + f"Speed: {progress.get('speed', 'N/A')}\n" + f"ETA: {progress.get('eta', 'N/A')}\n" + f"Size: {progress.get('downloaded_bytes', 0)}/{progress.get('total_bytes', 0)} bytes\n" + f"Started: {progress.get('start_time', 'N/A')}\n" + f"Retries: {progress.get('retries', 0)}\n" + f"-------------------\n" + ) + embed.add_field( + name="Active Downloads", + value=f"```\n{active_downloads}```", + inline=False, + ) + else: + embed.add_field( + name="Active Downloads", + value="```\nNo active downloads```", + inline=False, + ) + + # Active compressions + compressions = active_ops['compressions'] + if compressions: + active_compressions = "" + for file_id, progress in compressions.items(): + active_compressions += ( + f"File: {progress.get('filename', 'Unknown')}\n" + f"Progress: {progress.get('percent', 0):.1f}%\n" + f"Time Elapsed: {progress.get('elapsed_time', 'N/A')}\n" + f"Input Size: {progress.get('input_size', 0)} bytes\n" + f"Current Size: {progress.get('current_size', 0)} bytes\n" + f"Target Size: {progress.get('target_size', 0)} bytes\n" + f"Codec: {progress.get('codec', 'Unknown')}\n" + f"Hardware Accel: {progress.get('hardware_accel', False)}\n" + f"-------------------\n" + ) + embed.add_field( + name="Active Compressions", + value=f"```\n{active_compressions}```", + inline=False, + ) + else: + embed.add_field( + name="Active Compressions", + value="```\nNo active compressions```", + inline=False, + ) + + # Error statistics + if queue_status["metrics"]["errors_by_type"]: + error_stats = "\n".join( + f"{error_type}: {count}" + for error_type, count in queue_status["metrics"]["errors_by_type"].items() + ) + embed.add_field( + name="Error Statistics", + value=f"```\n{error_stats}```", + inline=False, + ) + + # Hardware acceleration statistics + embed.add_field( + name="Hardware Statistics", + value=f"```\n" + f"Hardware Accel Failures: {queue_status['metrics']['hardware_accel_failures']}\n" + f"Compression Failures: {queue_status['metrics']['compression_failures']}\n" + f"Peak Memory Usage: {queue_status['metrics']['peak_memory_usage']:.1f}MB\n" + f"```", + inline=False, + ) + + await ctx.send(embed=embed) + + except Exception as e: + logger.error(f"Error showing queue details: {str(e)}", exc_info=True) + await ctx.send(f"Error getting queue details: {str(e)}") diff --git a/videoarchiver/processor/message_handler.py b/videoarchiver/processor/message_handler.py new file mode 100644 index 0000000..4061d1d --- /dev/null +++ b/videoarchiver/processor/message_handler.py @@ -0,0 +1,116 @@ +"""Message processing and URL extraction for VideoProcessor""" + +import logging +import discord +from typing import List, Tuple, Optional +from videoarchiver.utils.video_downloader import is_video_url_pattern +from .reactions import REACTIONS + +logger = logging.getLogger("VideoArchiver") + +class MessageHandler: + """Handles processing of messages for video content""" + + def __init__(self, bot, config_manager, queue_manager): + self.bot = bot + self.config = config_manager + self.queue_manager = queue_manager + + async def process_message(self, message: discord.Message) -> None: + """Process a message for video content""" + try: + # Check if message contains any content to process + if not message.content and not message.attachments: + logger.debug(f"No content or attachments in message {message.id}") + return + + # Get guild settings + settings = await self.config.get_guild_settings(message.guild.id) + if not settings: + logger.warning(f"No settings found for guild {message.guild.id}") + return + + # Log settings for debugging + logger.debug(f"Guild {message.guild.id} settings: {settings}") + + # Check if channel is enabled + enabled_channels = settings.get("enabled_channels", []) + if enabled_channels and message.channel.id not in enabled_channels: + logger.debug(f"Channel {message.channel.id} not in enabled channels: {enabled_channels}") + return + + # Extract URLs from message + urls = await self._extract_urls(message, settings) + if not urls: + logger.debug("No valid URLs found in message") + return + + # Process each URL + await self._process_urls(message, urls) + + except Exception as e: + logger.error(f"Error processing message: {str(e)}", exc_info=True) + try: + await message.add_reaction(REACTIONS["error"]) + except: + pass + + async def _extract_urls(self, message: discord.Message, settings: dict) -> List[str]: + """Extract video URLs from message content and attachments""" + urls = [] + + # Extract from message content + if message.content: + logger.debug(f"Processing message content: {message.content}") + enabled_sites = settings.get("enabled_sites", []) + logger.debug(f"Enabled sites: {enabled_sites}") + + for word in message.content.split(): + logger.debug(f"Checking word: {word}") + if is_video_url_pattern(word): + if not enabled_sites or any(site in word.lower() for site in enabled_sites): + logger.debug(f"Found matching URL: {word}") + urls.append(word) + else: + logger.debug(f"URL {word} doesn't match any enabled sites") + else: + logger.debug(f"Word {word} is not a valid video URL") + + # Extract from attachments + for attachment in message.attachments: + logger.debug(f"Checking attachment: {attachment.filename}") + if any(attachment.filename.lower().endswith(ext) for ext in ['.mp4', '.mov', '.avi', '.webm']): + logger.debug(f"Found video attachment: {attachment.url}") + urls.append(attachment.url) + + return urls + + async def _process_urls(self, message: discord.Message, urls: List[str]) -> None: + """Process extracted URLs by adding them to the queue""" + for url in urls: + try: + logger.info(f"Adding URL to queue: {url}") + await message.add_reaction(REACTIONS['queued']) + await self.queue_manager.add_to_queue( + url=url, + message_id=message.id, + channel_id=message.channel.id, + guild_id=message.guild.id, + author_id=message.author.id, + priority=0 + ) + logger.info(f"Successfully added video to queue: {url}") + except Exception as e: + logger.error(f"Failed to add video to queue: {str(e)}") + await message.add_reaction(REACTIONS['error']) + continue + + async def format_archive_message(self, author: Optional[discord.Member], + channel: discord.TextChannel, + url: str) -> str: + """Format message for archive channel""" + author_mention = author.mention if author else "Unknown User" + channel_mention = channel.mention if channel else "Unknown Channel" + + return (f"Video archived from {author_mention} in {channel_mention}\n" + f"Original URL: {url}") diff --git a/videoarchiver/processor/progress_tracker.py b/videoarchiver/processor/progress_tracker.py new file mode 100644 index 0000000..aa03ec0 --- /dev/null +++ b/videoarchiver/processor/progress_tracker.py @@ -0,0 +1,88 @@ +"""Progress tracking for video downloads and compression""" + +from typing import Dict, Any +from datetime import datetime + +class ProgressTracker: + """Tracks progress of video downloads and compression operations""" + + def __init__(self): + self._download_progress: Dict[str, Dict[str, Any]] = {} + self._compression_progress: Dict[str, Dict[str, Any]] = {} + + def update_download_progress(self, url: str, progress_data: Dict[str, Any]) -> None: + """Update download progress for a specific URL""" + if url not in self._download_progress: + self._download_progress[url] = { + 'active': True, + 'start_time': datetime.utcnow().isoformat(), + 'retries': 0 + } + + self._download_progress[url].update(progress_data) + + def complete_download(self, url: str) -> None: + """Mark a download as complete""" + if url in self._download_progress: + self._download_progress[url]['active'] = False + self._download_progress[url]['completed_time'] = datetime.utcnow().isoformat() + + def increment_download_retries(self, url: str) -> None: + """Increment retry count for a download""" + if url in self._download_progress: + self._download_progress[url]['retries'] = self._download_progress[url].get('retries', 0) + 1 + + def update_compression_progress(self, file_id: str, progress_data: Dict[str, Any]) -> None: + """Update compression progress for a specific file""" + if file_id not in self._compression_progress: + self._compression_progress[file_id] = { + 'active': True, + 'start_time': datetime.utcnow().isoformat() + } + + self._compression_progress[file_id].update(progress_data) + + def complete_compression(self, file_id: str) -> None: + """Mark a compression operation as complete""" + if file_id in self._compression_progress: + self._compression_progress[file_id]['active'] = False + self._compression_progress[file_id]['completed_time'] = datetime.utcnow().isoformat() + + def get_download_progress(self, url: str = None) -> Dict[str, Any]: + """Get download progress for a specific URL or all downloads""" + if url: + return self._download_progress.get(url, {}) + return self._download_progress + + def get_compression_progress(self, file_id: str = None) -> Dict[str, Any]: + """Get compression progress for a specific file or all compressions""" + if file_id: + return self._compression_progress.get(file_id, {}) + return self._compression_progress + + def clear_completed(self) -> None: + """Clear completed operations from tracking""" + # Clear completed downloads + self._download_progress = { + url: data for url, data in self._download_progress.items() + if data.get('active', False) + } + + # Clear completed compressions + self._compression_progress = { + file_id: data for file_id, data in self._compression_progress.items() + if data.get('active', False) + } + + def get_active_operations(self) -> Dict[str, Dict[str, Any]]: + """Get all active operations""" + return { + 'downloads': { + url: data for url, data in self._download_progress.items() + if data.get('active', False) + }, + 'compressions': { + file_id: data for file_id, data in self._compression_progress.items() + if data.get('active', False) + } + } diff --git a/videoarchiver/processor/queue_handler.py b/videoarchiver/processor/queue_handler.py new file mode 100644 index 0000000..70f652c --- /dev/null +++ b/videoarchiver/processor/queue_handler.py @@ -0,0 +1,273 @@ +"""Queue processing and video handling operations""" + +import os +import logging +import asyncio +import discord +from typing import Dict, Optional, Tuple, Any +from datetime import datetime + +from .reactions import REACTIONS +from .progress_tracker import ProgressTracker + +logger = logging.getLogger("VideoArchiver") + +class QueueHandler: + """Handles queue processing and video operations""" + + def __init__(self, bot, config_manager, components): + self.bot = bot + self.config = config_manager + self.components = components + self._unloading = False + self._active_downloads: Dict[str, asyncio.Task] = {} + self._active_downloads_lock = asyncio.Lock() + self.progress_tracker = ProgressTracker() + + async def process_video(self, item) -> Tuple[bool, Optional[str]]: + """Process a video from the queue""" + if self._unloading: + return False, "Processor is unloading" + + file_path = None + original_message = None + download_task = None + + try: + guild_id = item.guild_id + if guild_id not in self.components: + return False, f"No components found for guild {guild_id}" + + components = self.components[guild_id] + downloader = components.get("downloader") + message_manager = components.get("message_manager") + + if not downloader or not message_manager: + return False, f"Missing required components for guild {guild_id}" + + # Get original message and update reactions + original_message = await self._get_original_message(item) + if original_message: + await original_message.remove_reaction(REACTIONS["queued"], self.bot.user) + await original_message.add_reaction(REACTIONS["processing"]) + logger.info(f"Started processing message {item.message_id}") + + # Create progress callback + progress_callback = self._create_progress_callback(original_message, item.url) + + # Download video + success, file_path, error = await self._download_video( + downloader, item.url, progress_callback + ) + if not success: + if original_message: + await original_message.add_reaction(REACTIONS["error"]) + logger.error(f"Download failed for message {item.message_id}: {error}") + return False, f"Failed to download video: {error}" + + # Archive video + success, error = await self._archive_video( + guild_id, original_message, message_manager, item.url, file_path + ) + if not success: + return False, error + + return True, None + + except Exception as e: + logger.error(f"Error processing video: {str(e)}", exc_info=True) + return False, str(e) + finally: + # Clean up downloaded file + if file_path and os.path.exists(file_path): + try: + os.unlink(file_path) + except Exception as e: + logger.error(f"Failed to clean up file {file_path}: {e}") + + async def _get_original_message(self, item) -> Optional[discord.Message]: + """Retrieve the original message""" + try: + channel = self.bot.get_channel(item.channel_id) + if not channel: + return None + return await channel.fetch_message(item.message_id) + except discord.NotFound: + return None + except Exception as e: + logger.error(f"Error fetching original message: {e}") + return None + + def _create_progress_callback(self, message: Optional[discord.Message], url: str): + """Create progress callback function for download tracking""" + def progress_callback(progress: float) -> None: + if message: + try: + loop = asyncio.get_running_loop() + if not loop.is_running(): + logger.warning("Event loop is not running, skipping progress update") + return + + # Update progress tracking + self.progress_tracker.update_download_progress(url, { + 'percent': progress, + 'last_update': datetime.utcnow().isoformat() + }) + + # Create task to update reaction + asyncio.run_coroutine_threadsafe( + self._update_download_progress_reaction(message, progress), + loop + ) + except Exception as e: + logger.error(f"Error in progress callback: {e}") + return progress_callback + + async def _download_video(self, downloader, url: str, progress_callback) -> Tuple[bool, Optional[str], Optional[str]]: + """Download video with progress tracking""" + download_task = asyncio.create_task( + downloader.download_video(url, progress_callback=progress_callback) + ) + + async with self._active_downloads_lock: + self._active_downloads[url] = download_task + + try: + success, file_path, error = await download_task + if success: + self.progress_tracker.complete_download(url) + else: + self.progress_tracker.increment_download_retries(url) + return success, file_path, error + except asyncio.CancelledError: + logger.info(f"Download cancelled for {url}") + return False, None, "Download cancelled" + except Exception as e: + logger.error(f"Download error: {str(e)}") + return False, None, f"Download error: {str(e)}" + finally: + async with self._active_downloads_lock: + self._active_downloads.pop(url, None) + + async def _archive_video(self, guild_id: int, original_message: Optional[discord.Message], + message_manager, url: str, file_path: str) -> Tuple[bool, Optional[str]]: + """Archive downloaded video""" + try: + # Get archive channel + guild = self.bot.get_guild(guild_id) + if not guild: + return False, f"Guild {guild_id} not found" + + archive_channel = await self.config.get_channel(guild, "archive") + if not archive_channel: + return False, "Archive channel not configured" + + # Format message + try: + author = original_message.author if original_message else None + channel = original_message.channel if original_message else None + message = await message_manager.format_message( + author=author, channel=channel, url=url + ) + except Exception as e: + return False, f"Failed to format message: {str(e)}" + + # Upload to archive channel + if not os.path.exists(file_path): + return False, "Processed file not found" + + await archive_channel.send(content=message, file=discord.File(file_path)) + + if original_message: + await original_message.remove_reaction(REACTIONS["processing"], self.bot.user) + await original_message.add_reaction(REACTIONS["success"]) + logger.info(f"Successfully processed message {original_message.id}") + + return True, None + + except discord.HTTPException as e: + if original_message: + await original_message.add_reaction(REACTIONS["error"]) + logger.error(f"Failed to upload to Discord: {str(e)}") + return False, f"Failed to upload to Discord: {str(e)}" + except Exception as e: + if original_message: + await original_message.add_reaction(REACTIONS["error"]) + logger.error(f"Failed to archive video: {str(e)}") + return False, f"Failed to archive video: {str(e)}" + + async def cleanup(self): + """Clean up resources and stop processing""" + try: + logger.info("Starting QueueHandler cleanup...") + self._unloading = True + + # Cancel all active downloads + async with self._active_downloads_lock: + for url, task in list(self._active_downloads.items()): + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error cancelling download task for {url}: {e}") + self._active_downloads.clear() + + logger.info("QueueHandler cleanup completed successfully") + + except Exception as e: + logger.error(f"Error during QueueHandler cleanup: {str(e)}", exc_info=True) + raise + + async def force_cleanup(self): + """Force cleanup of resources when normal cleanup fails""" + try: + logger.info("Starting force cleanup of QueueHandler...") + self._unloading = True + + # Force cancel all active downloads + for url, task in list(self._active_downloads.items()): + if not task.done(): + task.cancel() + self._active_downloads.clear() + + logger.info("QueueHandler force cleanup completed") + + except Exception as e: + logger.error(f"Error during QueueHandler force cleanup: {str(e)}", exc_info=True) + + async def _update_download_progress_reaction(self, message: discord.Message, progress: float): + """Update download progress reaction on message""" + if not message: + return + + try: + # Remove old reactions + for reaction in REACTIONS["download"]: + try: + await message.remove_reaction(reaction, self.bot.user) + except Exception as e: + logger.error(f"Failed to remove download reaction: {e}") + continue + + # Add new reaction based on progress + try: + if progress <= 20: + await message.add_reaction(REACTIONS["download"][0]) + elif progress <= 40: + await message.add_reaction(REACTIONS["download"][1]) + elif progress <= 60: + await message.add_reaction(REACTIONS["download"][2]) + elif progress <= 80: + await message.add_reaction(REACTIONS["download"][3]) + elif progress < 100: + await message.add_reaction(REACTIONS["download"][4]) + else: + await message.add_reaction(REACTIONS["download"][5]) + except Exception as e: + logger.error(f"Failed to add download reaction: {e}") + + except Exception as e: + logger.error(f"Failed to update download progress reaction: {e}") diff --git a/videoarchiver/processor/reactions.py b/videoarchiver/processor/reactions.py new file mode 100644 index 0000000..04d779d --- /dev/null +++ b/videoarchiver/processor/reactions.py @@ -0,0 +1,97 @@ +"""Reaction emojis and reaction management for VideoProcessor""" + +import logging +import asyncio +import discord + +logger = logging.getLogger("VideoArchiver") + +# Reaction emojis +REACTIONS = { + 'queued': '📹', + 'processing': '⚙️', + 'success': '✅', + 'error': '❌', + 'numbers': ['1️⃣', '2️⃣', '3️⃣', '4️⃣', '5️⃣'], + 'progress': ['⬛', '🟨', '🟩'], + 'download': ['0️⃣', '2️⃣', '4️⃣', '6️⃣', '8️⃣', '🔟'] +} + +async def update_queue_position_reaction(message: discord.Message, position: int, bot_user) -> None: + """Update queue position reaction""" + try: + for reaction in REACTIONS["numbers"]: + try: + await message.remove_reaction(reaction, bot_user) + except: + pass + + if 0 <= position < len(REACTIONS["numbers"]): + await message.add_reaction(REACTIONS["numbers"][position]) + logger.info( + f"Updated queue position reaction to {position + 1} for message {message.id}" + ) + except Exception as e: + logger.error(f"Failed to update queue position reaction: {e}") + +async def update_progress_reaction(message: discord.Message, progress: float, bot_user) -> None: + """Update progress reaction based on FFmpeg progress""" + if not message: + return + + try: + # Remove old reactions in the event loop + for reaction in REACTIONS["progress"]: + try: + await message.remove_reaction(reaction, bot_user) + except Exception as e: + logger.error(f"Failed to remove progress reaction: {e}") + continue + + # Add new reaction based on progress + try: + if progress < 33: + await message.add_reaction(REACTIONS["progress"][0]) + elif progress < 66: + await message.add_reaction(REACTIONS["progress"][1]) + else: + await message.add_reaction(REACTIONS["progress"][2]) + except Exception as e: + logger.error(f"Failed to add progress reaction: {e}") + + except Exception as e: + logger.error(f"Failed to update progress reaction: {e}") + +async def update_download_progress_reaction(message: discord.Message, progress: float, bot_user) -> None: + """Update download progress reaction""" + if not message: + return + + try: + # Remove old reactions in the event loop + for reaction in REACTIONS["download"]: + try: + await message.remove_reaction(reaction, bot_user) + except Exception as e: + logger.error(f"Failed to remove download reaction: {e}") + continue + + # Add new reaction based on progress + try: + if progress <= 20: + await message.add_reaction(REACTIONS["download"][0]) + elif progress <= 40: + await message.add_reaction(REACTIONS["download"][1]) + elif progress <= 60: + await message.add_reaction(REACTIONS["download"][2]) + elif progress <= 80: + await message.add_reaction(REACTIONS["download"][3]) + elif progress < 100: + await message.add_reaction(REACTIONS["download"][4]) + else: + await message.add_reaction(REACTIONS["download"][5]) + except Exception as e: + logger.error(f"Failed to add download reaction: {e}") + + except Exception as e: + logger.error(f"Failed to update download progress reaction: {e}")