Added proper command validation in the on_message event handler to prevent interference with command processing

Added missing asyncio import in events.py
Ensured all long-running operations are handled in background tasks
Fixed task initialization and management in base.py
This commit is contained in:
pacnpal
2024-11-16 01:04:08 +00:00
parent 46250b14d0
commit 01a9067368
3 changed files with 70 additions and 166 deletions

View File

@@ -7,51 +7,14 @@ from .exceptions import ProcessingError
logger = logging.getLogger("VideoArchiver")
# Global lock to prevent multiple concurrent setup attempts
_setup_lock = asyncio.Lock()
_setup_in_progress = False
async def setup(bot: Red) -> None:
"""Load VideoArchiver."""
global _setup_in_progress
# Use lock to prevent multiple concurrent setup attempts
async with _setup_lock:
try:
# Check if setup is already in progress
if _setup_in_progress:
logger.warning("VideoArchiver setup already in progress, skipping")
return
# Check if cog is already loaded
if "VideoArchiver" in bot.cogs:
logger.warning("VideoArchiver already loaded, skipping")
return
_setup_in_progress = True
# Load main cog
cog = VideoArchiver(bot)
await bot.add_cog(cog)
# Wait for initialization to complete with timeout
try:
await asyncio.wait_for(cog.ready.wait(), timeout=30)
except asyncio.TimeoutError:
logger.error("VideoArchiver initialization timed out")
await bot.remove_cog(cog.__class__.__name__)
raise ProcessingError("Initialization timed out")
if not cog.ready.is_set():
logger.error("VideoArchiver failed to initialize")
await bot.remove_cog(cog.__class__.__name__)
raise ProcessingError("Initialization failed")
except Exception as e:
logger.error(f"Failed to load VideoArchiver: {str(e)}")
raise
finally:
_setup_in_progress = False
try:
cog = VideoArchiver(bot)
await bot.add_cog(cog)
except Exception as e:
logger.error(f"Failed to load VideoArchiver: {str(e)}")
raise
async def teardown(bot: Red) -> None:
"""Clean up when unloading."""

View File

@@ -66,23 +66,25 @@ class VideoArchiver(GroupCog):
}
def __init__(self, bot: Red) -> None:
"""Initialize the cog with proper error handling"""
"""Initialize the cog with minimal setup"""
super().__init__()
self.bot = bot
self.ready = asyncio.Event()
self._init_task: Optional[asyncio.Task] = None
self._cleanup_task: Optional[asyncio.Task] = None
self._init_task = None
self._cleanup_task = None
self._queue_task = None
self._unloading = False
self.db = None
self.queue_manager = None
self.processor = None
self.components = {}
self.config_manager = None
self.update_checker = None
self.ffmpeg_mgr = None
self.data_path = None
self.download_path = None
# Start initialization
self._init_task = asyncio.create_task(self._initialize())
self._init_task.add_done_callback(self._init_callback)
# Set up events
# Set up events - non-blocking
setup_events(self)
@hybrid_group(name="archivedb", fallback="help")
@@ -350,7 +352,7 @@ class VideoArchiver(GroupCog):
asyncio.create_task(self._cleanup())
async def _initialize(self) -> None:
"""Initialize all components with proper error handling and timeouts"""
"""Initialize all components with proper error handling"""
try:
# Initialize config first as other components depend on it
config = Config.get_conf(self, identifier=855847, force_registration=True)
@@ -364,20 +366,16 @@ class VideoArchiver(GroupCog):
self.download_path.mkdir(parents=True, exist_ok=True)
logger.info("Paths initialized")
# Clean existing downloads with timeout
# Clean existing downloads
try:
await asyncio.wait_for(
cleanup_downloads(str(self.download_path)), timeout=CLEANUP_TIMEOUT
)
logger.info("Downloads cleaned up")
except asyncio.TimeoutError:
logger.warning("Download cleanup timed out, continuing initialization")
await cleanup_downloads(str(self.download_path))
except Exception as e:
logger.warning(f"Download cleanup error: {e}")
# Initialize shared FFmpeg manager
self.ffmpeg_mgr = FFmpegManager()
logger.info("FFmpeg manager initialized")
# Initialize queue manager before components
# Initialize queue manager
queue_path = self.data_path / "queue_state.json"
queue_path.parent.mkdir(parents=True, exist_ok=True)
self.queue_manager = EnhancedVideoQueueManager(
@@ -388,106 +386,51 @@ class VideoArchiver(GroupCog):
max_history_age=86400,
persistence_path=str(queue_path),
)
await self.queue_manager.initialize()
# Initialize queue manager with timeout
try:
await asyncio.wait_for(
self.queue_manager.initialize(), timeout=INIT_TIMEOUT
)
logger.info("Queue manager initialized successfully")
except asyncio.TimeoutError:
logger.error("Queue manager initialization timed out")
raise ProcessingError("Queue manager initialization timed out")
except Exception as e:
logger.error(f"Queue manager initialization failed: {e}")
raise
# Initialize processor with queue manager and shared FFmpeg manager
# Initialize processor
self.processor = VideoProcessor(
self.bot,
self.config_manager,
self.components,
queue_manager=self.queue_manager,
ffmpeg_mgr=self.ffmpeg_mgr,
db=self.db, # Pass database to processor (None by default)
db=self.db,
)
logger.info("Video processor initialized")
# Initialize components for existing guilds with timeout
# Initialize components for existing guilds
for guild in self.bot.guilds:
try:
await asyncio.wait_for(
initialize_guild_components(self, guild.id),
timeout=COMPONENT_INIT_TIMEOUT,
)
logger.info(f"Guild {guild.id} components initialized")
except asyncio.TimeoutError:
logger.error(f"Guild {guild.id} initialization timed out")
continue
await initialize_guild_components(self, guild.id)
except Exception as e:
logger.error(f"Failed to initialize guild {guild.id}: {str(e)}")
continue
# Initialize update checker
self.update_checker = UpdateChecker(self.bot, self.config_manager)
logger.info("Update checker initialized")
# Start update checker with timeout
try:
await asyncio.wait_for(
self.update_checker.start(), timeout=INIT_TIMEOUT
)
logger.info("Update checker started")
except asyncio.TimeoutError:
logger.warning("Update checker start timed out")
await self.update_checker.start()
# Start queue processing as a background task
# Only start after queue manager is fully initialized
self._queue_task = asyncio.create_task(
self.queue_manager.process_queue(self.processor.process_video)
)
logger.info("Queue processing task created")
# Set ready flag
self.ready.set()
logger.info("VideoArchiver initialization completed successfully")
except Exception as e:
logger.error(
f"Critical error during initialization: {str(e)}\n{traceback.format_exc()}"
)
# Force cleanup on initialization error
try:
await asyncio.wait_for(
force_cleanup_resources(self), timeout=CLEANUP_TIMEOUT
)
except asyncio.TimeoutError:
logger.error("Force cleanup during initialization timed out")
logger.error(f"Error during initialization: {str(e)}")
await self._cleanup()
raise
async def cog_load(self) -> None:
"""Handle cog loading with proper timeout"""
"""Handle cog loading without blocking"""
try:
# Create initialization task
init_task = asyncio.create_task(self._initialize())
try:
# Wait for initialization with timeout
await asyncio.wait_for(init_task, timeout=INIT_TIMEOUT)
logger.info("Initialization completed within timeout")
except asyncio.TimeoutError:
logger.error("Initialization timed out, forcing cleanup")
init_task.cancel()
await force_cleanup_resources(self)
raise ProcessingError("Cog initialization timed out")
# Wait for ready flag with timeout
try:
await asyncio.wait_for(self.ready.wait(), timeout=INIT_TIMEOUT)
logger.info("Ready flag set within timeout")
except asyncio.TimeoutError:
await force_cleanup_resources(self)
raise ProcessingError("Ready flag wait timed out")
# Start initialization as background task without waiting
self._init_task = asyncio.create_task(self._initialize())
self._init_task.add_done_callback(self._init_callback)
logger.info("Initialization started in background")
except Exception as e:
# Ensure cleanup on any error
try:

View File

@@ -2,6 +2,7 @@
import logging
import discord
import asyncio
import traceback
from typing import TYPE_CHECKING
@@ -16,7 +17,7 @@ logger = logging.getLogger("VideoArchiver")
def setup_events(cog: "VideoArchiver") -> None:
"""Set up event handlers for the cog"""
@cog.bot.event
@cog.listener()
async def on_guild_join(guild: discord.Guild) -> None:
"""Handle bot joining a new guild"""
if not cog.ready.is_set():
@@ -28,7 +29,7 @@ def setup_events(cog: "VideoArchiver") -> None:
except Exception as e:
logger.error(f"Failed to initialize new guild {guild.id}: {str(e)}")
@cog.bot.event
@cog.listener()
async def on_guild_remove(guild: discord.Guild) -> None:
"""Handle bot leaving a guild"""
try:
@@ -36,47 +37,22 @@ def setup_events(cog: "VideoArchiver") -> None:
except Exception as e:
logger.error(f"Error cleaning up removed guild {guild.id}: {str(e)}")
@cog.bot.event
@cog.listener()
async def on_message(message: discord.Message) -> None:
"""Handle new messages for video processing"""
# Only block video processing features, not command processing
if message.guild is None or message.author.bot:
# Skip if not ready or if message is from DM/bot
if not cog.ready.is_set() or message.guild is None or message.author.bot:
return
# Check if this is a command by checking against all possible prefixes
prefixes = await cog.bot.get_prefix(message)
if isinstance(prefixes, str):
prefixes = [prefixes]
# Check if message starts with any of the prefixes
if any(message.content.startswith(prefix) for prefix in prefixes):
# Skip if message is a command
ctx = await cog.bot.get_context(message)
if ctx.valid:
return
# Only process videos if initialization is complete
if not cog.ready.is_set():
logger.debug("Skipping video processing - initialization not complete")
return
# Process message in background task to avoid blocking
asyncio.create_task(process_message_background(cog, message))
try:
await cog.processor.process_message(message)
except Exception as e:
logger.error(
f"Error processing message {message.id}: {traceback.format_exc()}"
)
try:
log_channel = await cog.config_manager.get_channel(
message.guild, "log"
)
if log_channel:
await log_channel.send(
f"Error processing message: {str(e)}\n"
f"Message ID: {message.id}\n"
f"Channel: {message.channel.mention}"
)
except Exception as log_error:
logger.error(f"Failed to log error to guild: {str(log_error)}")
@cog.bot.event
@cog.listener()
async def on_raw_reaction_add(payload: discord.RawReactionActionEvent) -> None:
"""Handle reactions to messages"""
if payload.user_id == cog.bot.user.id:
@@ -96,7 +72,29 @@ def setup_events(cog: "VideoArchiver") -> None:
# Only process if database is enabled
if cog.db:
user = cog.bot.get_user(payload.user_id)
await handle_archived_reaction(message, user, cog.db)
# Process reaction in background task
asyncio.create_task(handle_archived_reaction(message, user, cog.db))
except Exception as e:
logger.error(f"Error handling reaction: {e}")
async def process_message_background(cog: "VideoArchiver", message: discord.Message) -> None:
"""Process message in background to avoid blocking"""
try:
await cog.processor.process_message(message)
except Exception as e:
logger.error(
f"Error processing message {message.id}: {traceback.format_exc()}"
)
try:
log_channel = await cog.config_manager.get_channel(
message.guild, "log"
)
if log_channel:
await log_channel.send(
f"Error processing message: {str(e)}\n"
f"Message ID: {message.id}\n"
f"Channel: {message.channel.mention}"
)
except Exception as log_error:
logger.error(f"Failed to log error to guild: {str(log_error)}")