From 01a9067368985fc5dbc67b1e1270c3191befe9d7 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Sat, 16 Nov 2024 01:04:08 +0000 Subject: [PATCH] Added proper command validation in the on_message event handler to prevent interference with command processing Added missing asyncio import in events.py Ensured all long-running operations are handled in background tasks Fixed task initialization and management in base.py --- videoarchiver/__init__.py | 49 ++------------- videoarchiver/core/base.py | 115 +++++++++-------------------------- videoarchiver/core/events.py | 72 +++++++++++----------- 3 files changed, 70 insertions(+), 166 deletions(-) diff --git a/videoarchiver/__init__.py b/videoarchiver/__init__.py index e991a9f..810d8be 100644 --- a/videoarchiver/__init__.py +++ b/videoarchiver/__init__.py @@ -7,51 +7,14 @@ from .exceptions import ProcessingError logger = logging.getLogger("VideoArchiver") -# Global lock to prevent multiple concurrent setup attempts -_setup_lock = asyncio.Lock() -_setup_in_progress = False - async def setup(bot: Red) -> None: """Load VideoArchiver.""" - global _setup_in_progress - - # Use lock to prevent multiple concurrent setup attempts - async with _setup_lock: - try: - # Check if setup is already in progress - if _setup_in_progress: - logger.warning("VideoArchiver setup already in progress, skipping") - return - - # Check if cog is already loaded - if "VideoArchiver" in bot.cogs: - logger.warning("VideoArchiver already loaded, skipping") - return - - _setup_in_progress = True - - # Load main cog - cog = VideoArchiver(bot) - await bot.add_cog(cog) - - # Wait for initialization to complete with timeout - try: - await asyncio.wait_for(cog.ready.wait(), timeout=30) - except asyncio.TimeoutError: - logger.error("VideoArchiver initialization timed out") - await bot.remove_cog(cog.__class__.__name__) - raise ProcessingError("Initialization timed out") - - if not cog.ready.is_set(): - logger.error("VideoArchiver failed to initialize") - await bot.remove_cog(cog.__class__.__name__) - raise ProcessingError("Initialization failed") - - except Exception as e: - logger.error(f"Failed to load VideoArchiver: {str(e)}") - raise - finally: - _setup_in_progress = False + try: + cog = VideoArchiver(bot) + await bot.add_cog(cog) + except Exception as e: + logger.error(f"Failed to load VideoArchiver: {str(e)}") + raise async def teardown(bot: Red) -> None: """Clean up when unloading.""" diff --git a/videoarchiver/core/base.py b/videoarchiver/core/base.py index c0b6e12..201b0f3 100644 --- a/videoarchiver/core/base.py +++ b/videoarchiver/core/base.py @@ -66,23 +66,25 @@ class VideoArchiver(GroupCog): } def __init__(self, bot: Red) -> None: - """Initialize the cog with proper error handling""" + """Initialize the cog with minimal setup""" super().__init__() self.bot = bot self.ready = asyncio.Event() - self._init_task: Optional[asyncio.Task] = None - self._cleanup_task: Optional[asyncio.Task] = None + self._init_task = None + self._cleanup_task = None + self._queue_task = None self._unloading = False self.db = None self.queue_manager = None self.processor = None self.components = {} + self.config_manager = None + self.update_checker = None + self.ffmpeg_mgr = None + self.data_path = None + self.download_path = None - # Start initialization - self._init_task = asyncio.create_task(self._initialize()) - self._init_task.add_done_callback(self._init_callback) - - # Set up events + # Set up events - non-blocking setup_events(self) @hybrid_group(name="archivedb", fallback="help") @@ -350,7 +352,7 @@ class VideoArchiver(GroupCog): asyncio.create_task(self._cleanup()) async def _initialize(self) -> None: - """Initialize all components with proper error handling and timeouts""" + """Initialize all components with proper error handling""" try: # Initialize config first as other components depend on it config = Config.get_conf(self, identifier=855847, force_registration=True) @@ -364,20 +366,16 @@ class VideoArchiver(GroupCog): self.download_path.mkdir(parents=True, exist_ok=True) logger.info("Paths initialized") - # Clean existing downloads with timeout + # Clean existing downloads try: - await asyncio.wait_for( - cleanup_downloads(str(self.download_path)), timeout=CLEANUP_TIMEOUT - ) - logger.info("Downloads cleaned up") - except asyncio.TimeoutError: - logger.warning("Download cleanup timed out, continuing initialization") + await cleanup_downloads(str(self.download_path)) + except Exception as e: + logger.warning(f"Download cleanup error: {e}") # Initialize shared FFmpeg manager self.ffmpeg_mgr = FFmpegManager() - logger.info("FFmpeg manager initialized") - # Initialize queue manager before components + # Initialize queue manager queue_path = self.data_path / "queue_state.json" queue_path.parent.mkdir(parents=True, exist_ok=True) self.queue_manager = EnhancedVideoQueueManager( @@ -388,106 +386,51 @@ class VideoArchiver(GroupCog): max_history_age=86400, persistence_path=str(queue_path), ) + await self.queue_manager.initialize() - # Initialize queue manager with timeout - try: - await asyncio.wait_for( - self.queue_manager.initialize(), timeout=INIT_TIMEOUT - ) - logger.info("Queue manager initialized successfully") - except asyncio.TimeoutError: - logger.error("Queue manager initialization timed out") - raise ProcessingError("Queue manager initialization timed out") - except Exception as e: - logger.error(f"Queue manager initialization failed: {e}") - raise - - # Initialize processor with queue manager and shared FFmpeg manager + # Initialize processor self.processor = VideoProcessor( self.bot, self.config_manager, self.components, queue_manager=self.queue_manager, ffmpeg_mgr=self.ffmpeg_mgr, - db=self.db, # Pass database to processor (None by default) + db=self.db, ) - logger.info("Video processor initialized") - # Initialize components for existing guilds with timeout + # Initialize components for existing guilds for guild in self.bot.guilds: try: - await asyncio.wait_for( - initialize_guild_components(self, guild.id), - timeout=COMPONENT_INIT_TIMEOUT, - ) - logger.info(f"Guild {guild.id} components initialized") - except asyncio.TimeoutError: - logger.error(f"Guild {guild.id} initialization timed out") - continue + await initialize_guild_components(self, guild.id) except Exception as e: logger.error(f"Failed to initialize guild {guild.id}: {str(e)}") continue # Initialize update checker self.update_checker = UpdateChecker(self.bot, self.config_manager) - logger.info("Update checker initialized") - - # Start update checker with timeout - try: - await asyncio.wait_for( - self.update_checker.start(), timeout=INIT_TIMEOUT - ) - logger.info("Update checker started") - except asyncio.TimeoutError: - logger.warning("Update checker start timed out") + await self.update_checker.start() # Start queue processing as a background task - # Only start after queue manager is fully initialized self._queue_task = asyncio.create_task( self.queue_manager.process_queue(self.processor.process_video) ) - logger.info("Queue processing task created") # Set ready flag self.ready.set() logger.info("VideoArchiver initialization completed successfully") except Exception as e: - logger.error( - f"Critical error during initialization: {str(e)}\n{traceback.format_exc()}" - ) - # Force cleanup on initialization error - try: - await asyncio.wait_for( - force_cleanup_resources(self), timeout=CLEANUP_TIMEOUT - ) - except asyncio.TimeoutError: - logger.error("Force cleanup during initialization timed out") + logger.error(f"Error during initialization: {str(e)}") + await self._cleanup() raise async def cog_load(self) -> None: - """Handle cog loading with proper timeout""" + """Handle cog loading without blocking""" try: - # Create initialization task - init_task = asyncio.create_task(self._initialize()) - try: - # Wait for initialization with timeout - await asyncio.wait_for(init_task, timeout=INIT_TIMEOUT) - logger.info("Initialization completed within timeout") - except asyncio.TimeoutError: - logger.error("Initialization timed out, forcing cleanup") - init_task.cancel() - await force_cleanup_resources(self) - raise ProcessingError("Cog initialization timed out") - - # Wait for ready flag with timeout - try: - await asyncio.wait_for(self.ready.wait(), timeout=INIT_TIMEOUT) - logger.info("Ready flag set within timeout") - except asyncio.TimeoutError: - await force_cleanup_resources(self) - raise ProcessingError("Ready flag wait timed out") - + # Start initialization as background task without waiting + self._init_task = asyncio.create_task(self._initialize()) + self._init_task.add_done_callback(self._init_callback) + logger.info("Initialization started in background") except Exception as e: # Ensure cleanup on any error try: diff --git a/videoarchiver/core/events.py b/videoarchiver/core/events.py index a0397a4..f3dd4a0 100644 --- a/videoarchiver/core/events.py +++ b/videoarchiver/core/events.py @@ -2,6 +2,7 @@ import logging import discord +import asyncio import traceback from typing import TYPE_CHECKING @@ -16,7 +17,7 @@ logger = logging.getLogger("VideoArchiver") def setup_events(cog: "VideoArchiver") -> None: """Set up event handlers for the cog""" - @cog.bot.event + @cog.listener() async def on_guild_join(guild: discord.Guild) -> None: """Handle bot joining a new guild""" if not cog.ready.is_set(): @@ -28,7 +29,7 @@ def setup_events(cog: "VideoArchiver") -> None: except Exception as e: logger.error(f"Failed to initialize new guild {guild.id}: {str(e)}") - @cog.bot.event + @cog.listener() async def on_guild_remove(guild: discord.Guild) -> None: """Handle bot leaving a guild""" try: @@ -36,47 +37,22 @@ def setup_events(cog: "VideoArchiver") -> None: except Exception as e: logger.error(f"Error cleaning up removed guild {guild.id}: {str(e)}") - @cog.bot.event + @cog.listener() async def on_message(message: discord.Message) -> None: """Handle new messages for video processing""" - # Only block video processing features, not command processing - if message.guild is None or message.author.bot: + # Skip if not ready or if message is from DM/bot + if not cog.ready.is_set() or message.guild is None or message.author.bot: return - # Check if this is a command by checking against all possible prefixes - prefixes = await cog.bot.get_prefix(message) - if isinstance(prefixes, str): - prefixes = [prefixes] - - # Check if message starts with any of the prefixes - if any(message.content.startswith(prefix) for prefix in prefixes): + # Skip if message is a command + ctx = await cog.bot.get_context(message) + if ctx.valid: return - # Only process videos if initialization is complete - if not cog.ready.is_set(): - logger.debug("Skipping video processing - initialization not complete") - return + # Process message in background task to avoid blocking + asyncio.create_task(process_message_background(cog, message)) - try: - await cog.processor.process_message(message) - except Exception as e: - logger.error( - f"Error processing message {message.id}: {traceback.format_exc()}" - ) - try: - log_channel = await cog.config_manager.get_channel( - message.guild, "log" - ) - if log_channel: - await log_channel.send( - f"Error processing message: {str(e)}\n" - f"Message ID: {message.id}\n" - f"Channel: {message.channel.mention}" - ) - except Exception as log_error: - logger.error(f"Failed to log error to guild: {str(log_error)}") - - @cog.bot.event + @cog.listener() async def on_raw_reaction_add(payload: discord.RawReactionActionEvent) -> None: """Handle reactions to messages""" if payload.user_id == cog.bot.user.id: @@ -96,7 +72,29 @@ def setup_events(cog: "VideoArchiver") -> None: # Only process if database is enabled if cog.db: user = cog.bot.get_user(payload.user_id) - await handle_archived_reaction(message, user, cog.db) + # Process reaction in background task + asyncio.create_task(handle_archived_reaction(message, user, cog.db)) except Exception as e: logger.error(f"Error handling reaction: {e}") + +async def process_message_background(cog: "VideoArchiver", message: discord.Message) -> None: + """Process message in background to avoid blocking""" + try: + await cog.processor.process_message(message) + except Exception as e: + logger.error( + f"Error processing message {message.id}: {traceback.format_exc()}" + ) + try: + log_channel = await cog.config_manager.get_channel( + message.guild, "log" + ) + if log_channel: + await log_channel.send( + f"Error processing message: {str(e)}\n" + f"Message ID: {message.id}\n" + f"Channel: {message.channel.mention}" + ) + except Exception as log_error: + logger.error(f"Failed to log error to guild: {str(log_error)}")