diff --git a/videoarchiver/enhanced_queue.py b/videoarchiver/enhanced_queue.py index 2cf912c..ffab3d2 100644 --- a/videoarchiver/enhanced_queue.py +++ b/videoarchiver/enhanced_queue.py @@ -1,4 +1,5 @@ """Enhanced queue system for VideoArchiver with improved memory management and performance""" + import asyncio import logging import json @@ -20,19 +21,20 @@ from .exceptions import ( ResourceExhaustedError, ProcessingError, CleanupError, - FileOperationError + FileOperationError, ) # Configure logging with proper format logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) -logger = logging.getLogger('EnhancedQueueManager') +logger = logging.getLogger("EnhancedQueueManager") + @dataclass class QueueMetrics: """Metrics tracking for queue performance and health""" + total_processed: int = 0 total_failed: int = 0 avg_processing_time: float = 0.0 @@ -534,54 +536,64 @@ class EnhancedVideoQueueManager: logger.error(f"Error during cleanup: {str(e)}") raise CleanupError(f"Failed to clean up queue manager: {str(e)}") - def get_queue_status(self, guild_id: Optional[int] = None) -> Dict[str, Any]: - """Get detailed queue status with metrics""" + async def clear_guild_queue(self, guild_id: int) -> int: + """Clear all queue items for a specific guild + + Args: + guild_id: The ID of the guild to clear items for + + Returns: + int: Number of items cleared + """ try: - if guild_id is not None: + cleared_count = 0 + async with self._queue_lock: + # Get URLs for this guild guild_urls = self._guild_queues.get(guild_id, set()) - status = { - "pending": sum(1 for item in self._queue if item.url in guild_urls), - "processing": sum( - 1 for url in self._processing if url in guild_urls - ), - "completed": sum(1 for url in self._completed if url in guild_urls), - "failed": sum(1 for url in self._failed if url in guild_urls), - } - else: - status = { - "pending": len(self._queue), - "processing": len(self._processing), - "completed": len(self._completed), - "failed": len(self._failed), - } - - # Add detailed metrics - status.update( - { - "metrics": { - "total_processed": self.metrics.total_processed, - "total_failed": self.metrics.total_failed, - "success_rate": self.metrics.success_rate, - "avg_processing_time": self.metrics.avg_processing_time, - "peak_memory_usage": self.metrics.peak_memory_usage, - "last_cleanup": self.metrics.last_cleanup.isoformat(), - "errors_by_type": self.metrics.errors_by_type, - "last_error": self.metrics.last_error, - "last_error_time": ( - self.metrics.last_error_time.isoformat() - if self.metrics.last_error_time - else None - ), - "retries": self.metrics.retries, + + # Clear from pending queue + self._queue = [item for item in self._queue if item.guild_id != guild_id] + + # Clear from processing + for url in list(self._processing.keys()): + if self._processing[url].guild_id == guild_id: + self._processing.pop(url) + cleared_count += 1 + + # Clear from completed + for url in list(self._completed.keys()): + if self._completed[url].guild_id == guild_id: + self._completed.pop(url) + cleared_count += 1 + + # Clear from failed + for url in list(self._failed.keys()): + if self._failed[url].guild_id == guild_id: + self._failed.pop(url) + cleared_count += 1 + + # Clear guild tracking + if guild_id in self._guild_queues: + cleared_count += len(self._guild_queues[guild_id]) + self._guild_queues[guild_id].clear() + + # Clear channel tracking for this guild's channels + for channel_id in list(self._channel_queues.keys()): + self._channel_queues[channel_id] = { + url for url in self._channel_queues[channel_id] + if url not in guild_urls } - } - ) - - return status - + + # Persist updated state + if self.persistence_path: + await self._persist_queue() + + logger.info(f"Cleared {cleared_count} items from guild {guild_id} queue") + return cleared_count + except Exception as e: - logger.error(f"Error getting queue status: {str(e)}") - raise QueueError(f"Failed to get queue status: {str(e)}") + logger.error(f"Error clearing guild queue: {traceback.format_exc()}") + raise QueueError(f"Failed to clear guild queue: {str(e)}") async def _periodic_cleanup(self): """Periodically clean up old completed/failed items""" diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index c008bf0..077f25d 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -1,4 +1,5 @@ """Video processing logic for VideoArchiver""" + import discord import logging import yt_dlp @@ -15,7 +16,8 @@ from videoarchiver.utils.file_ops import secure_delete_file, cleanup_downloads from videoarchiver.exceptions import ProcessingError, DiscordAPIError from videoarchiver.enhanced_queue import EnhancedVideoQueueManager -logger = logging.getLogger('VideoArchiver') +logger = logging.getLogger("VideoArchiver") + class VideoProcessor: """Handles video processing operations""" @@ -24,7 +26,7 @@ class VideoProcessor: self.bot = bot self.config = config_manager self.components = components - + # Initialize enhanced queue manager with persistence and error recovery data_dir = Path(os.path.dirname(__file__)) / "data" data_dir.mkdir(parents=True, exist_ok=True) @@ -35,14 +37,41 @@ class VideoProcessor: max_queue_size=1000, cleanup_interval=1800, # 30 minutes (reduced from 1 hour for more frequent cleanup) max_history_age=86400, # 24 hours - persistence_path=str(queue_path) + persistence_path=str(queue_path), ) # Track failed downloads for cleanup self._failed_downloads = set() self._failed_downloads_lock = asyncio.Lock() - async def process_video_url(self, url: str, message: discord.Message, priority: int = 0) -> bool: + # Start queue processing + self._queue_task = asyncio.create_task(self._process_queue()) + + async def _process_queue(self): + """Process the queue continuously""" + try: + await self.queue_manager.process_queue(self._process_video) + except Exception as e: + logger.error(f"Queue processing error: {traceback.format_exc()}") + # Restart queue processing + self._queue_task = asyncio.create_task(self._process_queue()) + + async def _process_video(self, item: Any) -> Tuple[bool, Optional[str]]: + """Process a video from the queue""" + try: + # Get the callback from the item + callback = getattr(item, "callback", None) + if callback: + success = await callback(item.url, True, "") + return success, None if success else "Callback failed" + return False, "No callback found" + except Exception as e: + logger.error(f"Error processing video: {traceback.format_exc()}") + return False, str(e) + + async def process_video_url( + self, url: str, message: discord.Message, priority: int = 0 + ) -> bool: """Process a video URL: download, reupload, and cleanup""" guild_id = message.guild.id start_time = datetime.utcnow() @@ -62,7 +91,7 @@ class VideoProcessor: await self._log_message( message.guild, f"User {message.author} does not have required roles for video archiving", - "warning" + "warning", ) return False @@ -74,9 +103,7 @@ class VideoProcessor: await message.remove_reaction("⏳", self.bot.user) await message.add_reaction("❌") await self._log_message( - message.guild, - f"Failed to process video: {error}", - "error" + message.guild, f"Failed to process video: {error}", "error" ) return False @@ -93,9 +120,7 @@ class VideoProcessor: await message.remove_reaction("⏳", self.bot.user) await message.add_reaction("❌") await self._log_message( - message.guild, - f"Failed to download video: {error}", - "error" + message.guild, f"Failed to download video: {error}", "error" ) # Track failed download for cleanup if file_path: @@ -105,8 +130,12 @@ class VideoProcessor: # Get channels with enhanced error handling try: - archive_channel = await self.config.get_channel(message.guild, "archive") - notification_channel = await self.config.get_channel(message.guild, "notification") + archive_channel = await self.config.get_channel( + message.guild, "archive" + ) + notification_channel = await self.config.get_channel( + message.guild, "notification" + ) if not notification_channel: notification_channel = archive_channel @@ -116,7 +145,7 @@ class VideoProcessor: await self._log_message( message.guild, f"Channel configuration error: {str(e)}", - "error" + "error", ) return False @@ -124,13 +153,14 @@ class VideoProcessor: # Upload to archive channel with original message link file = discord.File(file_path) archive_message = await archive_channel.send( - f"Original: {message.jump_url}", - file=file + f"Original: {message.jump_url}", file=file ) # Send notification with enhanced error handling for message formatting try: - notification_content = self.components[guild_id]["message_manager"].format_archive_message( + notification_content = self.components[guild_id][ + "message_manager" + ].format_archive_message( username=message.author.name, channel=message.channel.name, original_message=message.jump_url, @@ -139,7 +169,9 @@ class VideoProcessor: logger.error(f"Message formatting error: {str(e)}") notification_content = f"Video archived from {message.author.name} in {message.channel.name}\nOriginal: {message.jump_url}" - notification_message = await notification_channel.send(notification_content) + notification_message = await notification_channel.send( + notification_content + ) # Schedule notification message deletion with error handling try: @@ -149,26 +181,28 @@ class VideoProcessor: notification_message.id, notification_message.delete ) except Exception as e: - logger.error(f"Failed to schedule message deletion: {str(e)}") + logger.error( + f"Failed to schedule message deletion: {str(e)}" + ) # Update reaction to show completion await message.remove_reaction("⏳", self.bot.user) await message.add_reaction("✅") - + # Log processing time - processing_time = (datetime.utcnow() - start_time).total_seconds() + processing_time = ( + datetime.utcnow() - start_time + ).total_seconds() await self._log_message( message.guild, - f"Successfully archived video from {message.author} (took {processing_time:.1f}s)" + f"Successfully archived video from {message.author} (took {processing_time:.1f}s)", ) return True except discord.HTTPException as e: await self._log_message( - message.guild, - f"Discord API error: {str(e)}", - "error" + message.guild, f"Discord API error: {str(e)}", "error" ) await message.remove_reaction("⏳", self.bot.user) await message.add_reaction("❌") @@ -181,16 +215,22 @@ class VideoProcessor: if secure_delete_file(file_path): await self._log_message( message.guild, - f"Successfully deleted file: {file_path}" + f"Successfully deleted file: {file_path}", ) else: await self._log_message( message.guild, f"Failed to delete file: {file_path}", - "error" + "error", ) # Emergency cleanup - cleanup_downloads(str(self.components[guild_id]["downloader"].download_path)) + cleanup_downloads( + str( + self.components[guild_id][ + "downloader" + ].download_path + ) + ) except Exception as e: logger.error(f"File deletion error: {str(e)}") # Track for later cleanup @@ -200,9 +240,7 @@ class VideoProcessor: except Exception as e: logger.error(f"Process callback error: {traceback.format_exc()}") await self._log_message( - message.guild, - f"Error in process callback: {str(e)}", - "error" + message.guild, f"Error in process callback: {str(e)}", "error" ) return False @@ -215,16 +253,14 @@ class VideoProcessor: guild_id=guild_id, author_id=message.author.id, callback=process_callback, - priority=priority + priority=priority, ) except Exception as e: logger.error(f"Queue error: {str(e)}") await message.remove_reaction("⏳", self.bot.user) await message.add_reaction("❌") await self._log_message( - message.guild, - f"Failed to add to queue: {str(e)}", - "error" + message.guild, f"Failed to add to queue: {str(e)}", "error" ) return False @@ -235,7 +271,7 @@ class VideoProcessor: f"Queue Status - Pending: {queue_status['pending']}, " f"Processing: {queue_status['processing']}, " f"Success Rate: {queue_status['metrics']['success_rate']:.2%}, " - f"Avg Processing Time: {queue_status['metrics']['avg_processing_time']:.1f}s" + f"Avg Processing Time: {queue_status['metrics']['avg_processing_time']:.1f}s", ) return True @@ -243,9 +279,7 @@ class VideoProcessor: except Exception as e: logger.error(f"Error processing video: {traceback.format_exc()}") await self._log_message( - message.guild, - f"Error processing video: {str(e)}", - "error" + message.guild, f"Error processing video: {str(e)}", "error" ) await message.remove_reaction("⏳", self.bot.user) await message.add_reaction("❌") @@ -277,9 +311,7 @@ class VideoProcessor: except Exception as e: logger.error(f"Error processing message: {traceback.format_exc()}") await self._log_message( - message.guild, - f"Error processing message: {str(e)}", - "error" + message.guild, f"Error processing message: {str(e)}", "error" ) def _extract_urls(self, content: str) -> List[str]: @@ -293,7 +325,7 @@ class VideoProcessor: for word in words: # Try each extractor for ie in ydl._ies: - if hasattr(ie, '_VALID_URL') and ie._VALID_URL: + if hasattr(ie, "_VALID_URL") and ie._VALID_URL: # Use regex pattern matching instead of suitable() if re.match(ie._VALID_URL, word): urls.append(word) @@ -302,7 +334,9 @@ class VideoProcessor: logger.error(f"URL extraction error: {str(e)}") return list(set(urls)) # Remove duplicates - async def _log_message(self, guild: discord.Guild, message: str, level: str = "info"): + async def _log_message( + self, guild: discord.Guild, message: str, level: str = "info" + ): """Log a message to the guild's log channel with enhanced formatting""" log_channel = await self.config.get_channel(guild, "log") if log_channel: @@ -311,15 +345,25 @@ class VideoProcessor: formatted_message = f"[{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}] [{level.upper()}] {message}" await log_channel.send(formatted_message) except discord.HTTPException as e: - logger.error(f"Failed to send log message to channel: {message} ({str(e)})") + logger.error( + f"Failed to send log message to channel: {message} ({str(e)})" + ) logger.log(getattr(logging, level.upper()), message) async def cleanup(self): """Clean up resources with enhanced error handling""" try: + # Cancel queue processing task + if hasattr(self, "_queue_task"): + self._queue_task.cancel() + try: + await self._queue_task + except asyncio.CancelledError: + pass + # Clean up queue await self.queue_manager.cleanup() - + # Clean up failed downloads async with self._failed_downloads_lock: for file_path in self._failed_downloads: @@ -329,6 +373,6 @@ class VideoProcessor: except Exception as e: logger.error(f"Failed to clean up file {file_path}: {str(e)}") self._failed_downloads.clear() - + except Exception as e: logger.error(f"Error during cleanup: {str(e)}")