diff --git a/videoarchiver/enhanced_queue.py b/videoarchiver/enhanced_queue.py index b270766..ab14eb6 100644 --- a/videoarchiver/enhanced_queue.py +++ b/videoarchiver/enhanced_queue.py @@ -130,7 +130,7 @@ class EnhancedVideoQueueManager: max_history_age: int = 86400, # 24 hours persistence_path: Optional[str] = None, backup_interval: int = 300, # 5 minutes - deadlock_threshold: int = 900, # 15 minutes (reduced from 1 hour) + deadlock_threshold: int = 900, # 15 minutes ): self.max_retries = max_retries self.retry_delay = retry_delay @@ -151,6 +151,7 @@ class EnhancedVideoQueueManager: # Track active tasks self._active_tasks: Set[asyncio.Task] = set() self._processing_lock = asyncio.Lock() + self._shutdown = False # Status tracking self._guild_queues: Dict[int, Set[str]] = {} @@ -184,73 +185,80 @@ class EnhancedVideoQueueManager: # Load persisted queue self._load_persisted_queue() - async def add_to_queue( - self, - url: str, - message_id: int, - channel_id: int, - guild_id: int, - author_id: int, - callback: Optional[ - Callable[[str, bool, str], Any] - ] = None, # Make callback optional - priority: int = 0, - ) -> bool: - """Add a video to the processing queue with priority support""" - try: - async with self._queue_lock: - if len(self._queue) >= self.max_queue_size: - raise QueueError("Queue is full") + def force_stop(self): + """Force stop all queue operations immediately""" + self._shutdown = True + + # Cancel all active tasks + for task in self._active_tasks: + if not task.done(): + task.cancel() - # Check system resources - if psutil.virtual_memory().percent > 90: - raise ResourceExhaustedError("System memory is critically low") - - # Create queue item - item = QueueItem( - url=url, - message_id=message_id, - channel_id=channel_id, - guild_id=guild_id, - author_id=author_id, - added_at=datetime.utcnow(), - priority=priority, - ) - - # Add to tracking collections - if guild_id not in self._guild_queues: - self._guild_queues[guild_id] = set() - self._guild_queues[guild_id].add(url) - - if channel_id not in self._channel_queues: - self._channel_queues[channel_id] = set() - self._channel_queues[channel_id].add(url) - - # Add to queue with priority + # Move processing items back to queue + for url, item in self._processing.items(): + if item.retry_count < self.max_retries: + item.status = "pending" + item.retry_count += 1 self._queue.append(item) - self._queue.sort(key=lambda x: (-x.priority, x.added_at)) + else: + self._failed[url] = item - # Persist queue state + self._processing.clear() + + # Clear task tracking + self._active_tasks.clear() + + logger.info("Queue manager force stopped") + + async def cleanup(self): + """Clean up resources and stop queue processing""" + try: + # Set shutdown flag + self._shutdown = True + + # Cancel all monitoring tasks + for task in self._active_tasks: + if not task.done(): + task.cancel() + + await asyncio.gather(*self._active_tasks, return_exceptions=True) + + # Move processing items back to queue + async with self._queue_lock: + for url, item in self._processing.items(): + if item.retry_count < self.max_retries: + item.status = "pending" + item.retry_count += 1 + self._queue.append(item) + else: + self._failed[url] = item + + self._processing.clear() + + # Persist final state if self.persistence_path: await self._persist_queue() - logger.info(f"Added video to queue: {url} with priority {priority}") - return True + # Clear all collections + self._queue.clear() + self._completed.clear() + self._failed.clear() + self._guild_queues.clear() + self._channel_queues.clear() + self._active_tasks.clear() + + logger.info("Queue manager cleanup completed") except Exception as e: - logger.error(f"Error adding video to queue: {traceback.format_exc()}") - raise QueueError(f"Failed to add to queue: {str(e)}") + logger.error(f"Error during cleanup: {str(e)}") + raise CleanupError(f"Failed to clean up queue manager: {str(e)}") async def process_queue( self, processor: Callable[[QueueItem], Tuple[bool, Optional[str]]] ): - """Process items in the queue with the provided processor function - - Args: - processor: A callable that takes a QueueItem and returns a tuple of (success: bool, error: Optional[str]) - """ + """Process items in the queue with the provided processor function""" logger.info("Queue processor started and waiting for items...") - while True: + while not self._shutdown: try: # Get next item from queue item = None @@ -340,7 +348,6 @@ class EnhancedVideoQueueManager: await self._persist_queue() except Exception as e: logger.error(f"Failed to persist queue state: {e}") - # Continue processing even if persistence fails except Exception as e: logger.error( @@ -353,9 +360,68 @@ class EnhancedVideoQueueManager: # Small delay to prevent CPU overload await asyncio.sleep(0.1) + logger.info("Queue processor stopped due to shutdown") + + async def add_to_queue( + self, + url: str, + message_id: int, + channel_id: int, + guild_id: int, + author_id: int, + priority: int = 0, + ) -> bool: + """Add a video to the processing queue with priority support""" + if self._shutdown: + raise QueueError("Queue manager is shutting down") + + try: + async with self._queue_lock: + if len(self._queue) >= self.max_queue_size: + raise QueueError("Queue is full") + + # Check system resources + if psutil.virtual_memory().percent > 90: + raise ResourceExhaustedError("System memory is critically low") + + # Create queue item + item = QueueItem( + url=url, + message_id=message_id, + channel_id=channel_id, + guild_id=guild_id, + author_id=author_id, + added_at=datetime.utcnow(), + priority=priority, + ) + + # Add to tracking collections + if guild_id not in self._guild_queues: + self._guild_queues[guild_id] = set() + self._guild_queues[guild_id].add(url) + + if channel_id not in self._channel_queues: + self._channel_queues[channel_id] = set() + self._channel_queues[channel_id].add(url) + + # Add to queue with priority + self._queue.append(item) + self._queue.sort(key=lambda x: (-x.priority, x.added_at)) + + # Persist queue state + if self.persistence_path: + await self._persist_queue() + + logger.info(f"Added video to queue: {url} with priority {priority}") + return True + + except Exception as e: + logger.error(f"Error adding video to queue: {traceback.format_exc()}") + raise QueueError(f"Failed to add to queue: {str(e)}") + async def _periodic_backup(self): """Periodically backup queue state""" - while True: + while not self._shutdown: try: if self.persistence_path and ( not self._last_backup @@ -365,6 +431,8 @@ class EnhancedVideoQueueManager: await self._persist_queue() self._last_backup = datetime.utcnow() await asyncio.sleep(self.backup_interval) + except asyncio.CancelledError: + break except Exception as e: logger.error(f"Error in periodic backup: {str(e)}") await asyncio.sleep(60) @@ -482,7 +550,7 @@ class EnhancedVideoQueueManager: async def _monitor_health(self): """Monitor queue health and performance with improved metrics""" - while True: + while not self._shutdown: try: # Check memory usage process = psutil.Process() @@ -492,10 +560,9 @@ class EnhancedVideoQueueManager: logger.warning(f"High memory usage detected: {memory_usage:.2f}MB") # Force garbage collection import gc - gc.collect() - # Check for potential deadlocks with reduced threshold + # Check for potential deadlocks processing_times = [ time.time() - item.processing_time for item in self._processing.values() @@ -504,7 +571,7 @@ class EnhancedVideoQueueManager: if processing_times: max_time = max(processing_times) - if max_time > self.deadlock_threshold: # Reduced from 3600s to 900s + if max_time > self.deadlock_threshold: logger.warning( f"Potential deadlock detected: Item processing for {max_time:.2f}s" ) @@ -528,6 +595,8 @@ class EnhancedVideoQueueManager: await asyncio.sleep(300) # Check every 5 minutes + except asyncio.CancelledError: + break except Exception as e: logger.error(f"Error in health monitor: {traceback.format_exc()}") await asyncio.sleep(60) @@ -563,43 +632,54 @@ class EnhancedVideoQueueManager: except Exception as e: logger.error(f"Error recovering stuck items: {str(e)}") - async def cleanup(self): - """Clean up resources and stop queue processing""" - try: - # Cancel all monitoring tasks - for task in self._active_tasks: - if not task.done(): - task.cancel() + async def _periodic_cleanup(self): + """Periodically clean up old completed/failed items""" + while not self._shutdown: + try: + current_time = datetime.utcnow() + cleanup_cutoff = current_time - timedelta(seconds=self.max_history_age) - await asyncio.gather(*self._active_tasks, return_exceptions=True) + async with self._queue_lock: + # Clean up completed items + for url in list(self._completed.keys()): + item = self._completed[url] + if item.added_at < cleanup_cutoff: + self._completed.pop(url) - # Persist final state - if self.persistence_path: - await self._persist_queue() + # Clean up failed items + for url in list(self._failed.keys()): + item = self._failed[url] + if item.added_at < cleanup_cutoff: + self._failed.pop(url) - # Clear all collections - self._queue.clear() - self._processing.clear() - self._completed.clear() - self._failed.clear() - self._guild_queues.clear() - self._channel_queues.clear() + # Clean up guild and channel tracking + for guild_id in list(self._guild_queues.keys()): + self._guild_queues[guild_id] = { + url + for url in self._guild_queues[guild_id] + if url in self._queue or url in self._processing + } - logger.info("Queue manager cleanup completed") + for channel_id in list(self._channel_queues.keys()): + self._channel_queues[channel_id] = { + url + for url in self._channel_queues[channel_id] + if url in self._queue or url in self._processing + } - except Exception as e: - logger.error(f"Error during cleanup: {str(e)}") - raise CleanupError(f"Failed to clean up queue manager: {str(e)}") + self.metrics.last_cleanup = current_time + logger.info("Completed periodic queue cleanup") + + await asyncio.sleep(self.cleanup_interval) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in periodic cleanup: {traceback.format_exc()}") + await asyncio.sleep(60) def get_queue_status(self, guild_id: int) -> dict: - """Get current queue status and metrics for a guild - - Args: - guild_id: The ID of the guild to get status for - - Returns: - dict: Queue status including counts and metrics - """ + """Get current queue status and metrics for a guild""" try: # Count items for this guild pending = len([item for item in self._queue if item.guild_id == guild_id]) @@ -660,14 +740,10 @@ class EnhancedVideoQueueManager: } async def clear_guild_queue(self, guild_id: int) -> int: - """Clear all queue items for a specific guild + """Clear all queue items for a specific guild""" + if self._shutdown: + raise QueueError("Queue manager is shutting down") - Args: - guild_id: The ID of the guild to clear items for - - Returns: - int: Number of items cleared - """ try: cleared_count = 0 async with self._queue_lock: @@ -720,47 +796,3 @@ class EnhancedVideoQueueManager: except Exception as e: logger.error(f"Error clearing guild queue: {traceback.format_exc()}") raise QueueError(f"Failed to clear guild queue: {str(e)}") - - async def _periodic_cleanup(self): - """Periodically clean up old completed/failed items""" - while True: - try: - current_time = datetime.utcnow() - cleanup_cutoff = current_time - timedelta(seconds=self.max_history_age) - - async with self._queue_lock: - # Clean up completed items - for url in list(self._completed.keys()): - item = self._completed[url] - if item.added_at < cleanup_cutoff: - self._completed.pop(url) - - # Clean up failed items - for url in list(self._failed.keys()): - item = self._failed[url] - if item.added_at < cleanup_cutoff: - self._failed.pop(url) - - # Clean up guild and channel tracking - for guild_id in list(self._guild_queues.keys()): - self._guild_queues[guild_id] = { - url - for url in self._guild_queues[guild_id] - if url in self._queue or url in self._processing - } - - for channel_id in list(self._channel_queues.keys()): - self._channel_queues[channel_id] = { - url - for url in self._channel_queues[channel_id] - if url in self._queue or url in self._processing - } - - self.metrics.last_cleanup = current_time - logger.info("Completed periodic queue cleanup") - - await asyncio.sleep(self.cleanup_interval) - - except Exception as e: - logger.error(f"Error in periodic cleanup: {traceback.format_exc()}") - await asyncio.sleep(60) diff --git a/videoarchiver/ffmpeg/ffmpeg_manager.py b/videoarchiver/ffmpeg/ffmpeg_manager.py index d1069d4..2dd2106 100644 --- a/videoarchiver/ffmpeg/ffmpeg_manager.py +++ b/videoarchiver/ffmpeg/ffmpeg_manager.py @@ -6,8 +6,10 @@ import multiprocessing import logging import subprocess import traceback +import signal +import psutil from pathlib import Path -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, Set from videoarchiver.ffmpeg.exceptions import ( FFmpegError, @@ -66,11 +68,53 @@ class FFmpegManager: # Initialize encoder params self.encoder_params = EncoderParams(self._cpu_cores, self._gpu_info) + + # Track active FFmpeg processes + self._active_processes: Set[subprocess.Popen] = set() # Verify FFmpeg functionality self._verify_ffmpeg() logger.info("FFmpeg manager initialized successfully") + def kill_all_processes(self) -> None: + """Kill all active FFmpeg processes""" + try: + # First try graceful termination + for process in self._active_processes: + try: + if process.poll() is None: # Process is still running + process.terminate() + except Exception as e: + logger.error(f"Error terminating FFmpeg process: {e}") + + # Give processes a moment to terminate + import time + time.sleep(0.5) + + # Force kill any remaining processes + for process in self._active_processes: + try: + if process.poll() is None: # Process is still running + process.kill() + except Exception as e: + logger.error(f"Error killing FFmpeg process: {e}") + + # Find and kill any orphaned FFmpeg processes + for proc in psutil.process_iter(['pid', 'name', 'cmdline']): + try: + if 'ffmpeg' in proc.info['name'].lower(): + proc.kill() + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + pass + except Exception as e: + logger.error(f"Error killing orphaned FFmpeg process: {e}") + + self._active_processes.clear() + logger.info("All FFmpeg processes terminated") + + except Exception as e: + logger.error(f"Error killing FFmpeg processes: {e}") + def _initialize_binaries(self) -> Dict[str, Path]: """Initialize FFmpeg and FFprobe binaries with proper error handling""" try: diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index d0d29e2..f353fa5 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -7,7 +7,7 @@ import discord from discord.ext import commands from discord import app_commands from pathlib import Path -from typing import Dict, Any, Optional, Tuple +from typing import Dict, Any, Optional, Tuple, Set import traceback from datetime import datetime @@ -28,9 +28,9 @@ REACTIONS = { 'processing': '⚙️', 'success': '✅', 'error': '❌', - 'numbers': ['1️⃣', '2️⃣', '3️⃣', '4️⃣', '5️⃣'], # Queue position indicators - 'progress': ['⬛', '🟨', '🟩'], # Progress indicators (0%, 50%, 100%) - 'download': ['0️⃣', '2️⃣', '4️⃣', '6️⃣', '8️⃣', '🔟'] # Download progress (0%, 20%, 40%, 60%, 80%, 100%) + 'numbers': ['1️⃣', '2️⃣', '3️⃣', '4️⃣', '5️⃣'], + 'progress': ['⬛', '🟨', '🟩'], + 'download': ['0️⃣', '2️⃣', '4️⃣', '6️⃣', '8️⃣', '🔟'] } # Global queue manager instance to persist across reloads @@ -56,18 +56,21 @@ class VideoProcessor: self.components = components self.ffmpeg_mgr = ffmpeg_mgr + # Track active downloads and their tasks + self._active_downloads: Dict[str, asyncio.Task] = {} + self._active_downloads_lock = asyncio.Lock() + self._unloading = False + # Use global queue manager if available global _global_queue_manager if _global_queue_manager is not None: self.queue_manager = _global_queue_manager logger.info("Using existing global queue manager") - # Use provided queue manager if available elif queue_manager: self.queue_manager = queue_manager _global_queue_manager = queue_manager logger.info("Using provided queue manager and setting as global") else: - # Initialize enhanced queue manager with persistence and error recovery data_dir = Path(os.path.dirname(__file__)) / "data" data_dir.mkdir(parents=True, exist_ok=True) queue_path = data_dir / "queue_state.json" @@ -76,30 +79,338 @@ class VideoProcessor: max_retries=3, retry_delay=5, max_queue_size=1000, - cleanup_interval=1800, # 30 minutes - max_history_age=86400, # 24 hours + cleanup_interval=1800, + max_history_age=86400, persistence_path=str(queue_path) ) _global_queue_manager = self.queue_manager logger.info("Created new queue manager and set as global") - # Track failed downloads for cleanup - self._failed_downloads = set() - self._failed_downloads_lock = asyncio.Lock() - # Start queue processing logger.info("Starting video processing queue...") self._queue_task = asyncio.create_task(self.queue_manager.process_queue(self._process_video)) logger.info("Video processing queue started successfully") - # Register commands - @commands.hybrid_command(name='queuedetails') - @commands.is_owner() - async def queue_details(ctx): - """Show detailed queue status and progress information""" - await self._show_queue_details(ctx) + async def _cancel_active_downloads(self) -> None: + """Cancel all active downloads and requeue them""" + async with self._active_downloads_lock: + for url, task in self._active_downloads.items(): + if not task.done(): + # Cancel the task + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error cancelling download task for {url}: {e}") - self.bot.add_command(queue_details) + # Requeue the download if we're unloading + if self._unloading and url in _download_progress: + try: + # Get the original message details from progress tracking + progress = _download_progress[url] + if progress.get('message_id') and progress.get('channel_id') and progress.get('guild_id'): + await self.queue_manager.add_to_queue( + url=url, + message_id=progress['message_id'], + channel_id=progress['channel_id'], + guild_id=progress['guild_id'], + author_id=progress.get('author_id') + ) + logger.info(f"Requeued download for {url}") + except Exception as e: + logger.error(f"Failed to requeue download for {url}: {e}") + + self._active_downloads.clear() + + async def cleanup(self) -> None: + """Clean up resources with proper handling""" + try: + self._unloading = True + + # Cancel queue processing + if hasattr(self, '_queue_task') and not self._queue_task.done(): + self._queue_task.cancel() + try: + await self._queue_task + except asyncio.CancelledError: + pass + + # Cancel and requeue active downloads + await self._cancel_active_downloads() + + # Clean up progress tracking + _download_progress.clear() + _compression_progress.clear() + + except Exception as e: + logger.error(f"Error during processor cleanup: {e}") + raise + finally: + self._unloading = False + + async def force_cleanup(self) -> None: + """Force cleanup of resources when timeout occurs""" + try: + # Cancel all tasks immediately without requeuing + async with self._active_downloads_lock: + for task in self._active_downloads.values(): + if not task.done(): + task.cancel() + + # Cancel queue task + if hasattr(self, '_queue_task') and not self._queue_task.done(): + self._queue_task.cancel() + + # Clear all tracking + self._active_downloads.clear() + _download_progress.clear() + _compression_progress.clear() + + except Exception as e: + logger.error(f"Error during force cleanup: {e}") + + async def process_message(self, message: discord.Message) -> None: + """Process a message for video content""" + try: + if not message.guild or not message.guild.id in self.components: + return + + components = self.components[message.guild.id] + downloader = components.get("downloader") + if not downloader: + logger.error(f"No downloader found for guild {message.guild.id}") + return + + content = message.content.strip() + if not content or not downloader.is_supported_url(content): + return + + try: + await message.add_reaction(REACTIONS['queued']) + logger.info(f"Added queued reaction to message {message.id}") + except Exception as e: + logger.error(f"Failed to add queued reaction: {e}") + + # Track message details in progress tracking + _download_progress[content] = { + 'active': False, + 'message_id': message.id, + 'channel_id': message.channel.id, + 'guild_id': message.guild.id, + 'author_id': message.author.id, + 'start_time': None, + 'percent': 0, + 'speed': 'N/A', + 'eta': 'N/A', + 'downloaded_bytes': 0, + 'total_bytes': 0, + 'retries': 0 + } + + await self.queue_manager.add_to_queue( + url=content, + message_id=message.id, + channel_id=message.channel.id, + guild_id=message.guild.id, + author_id=message.author.id + ) + logger.info(f"Added message {message.id} to processing queue") + + queue_status = self.queue_manager.get_queue_status(message.guild.id) + queue_position = queue_status['pending'] - 1 + await self.update_queue_position_reaction(message, queue_position) + logger.info(f"Message {message.id} is at position {queue_position + 1} in queue") + + except Exception as e: + logger.error(f"Error processing message: {traceback.format_exc()}") + raise ProcessingError(f"Failed to process message: {str(e)}") + + async def _process_video(self, item) -> Tuple[bool, Optional[str]]: + """Process a video from the queue""" + if self._unloading: + return False, "Processor is unloading" + + file_path = None + original_message = None + download_task = None + + try: + guild_id = item.guild_id + if guild_id not in self.components: + return False, f"No components found for guild {guild_id}" + + components = self.components[guild_id] + downloader = components.get("downloader") + message_manager = components.get("message_manager") + + if not downloader or not message_manager: + return False, f"Missing required components for guild {guild_id}" + + try: + channel = self.bot.get_channel(item.channel_id) + if not channel: + return False, f"Channel {item.channel_id} not found" + original_message = await channel.fetch_message(item.message_id) + + await original_message.remove_reaction(REACTIONS['queued'], self.bot.user) + await original_message.add_reaction(REACTIONS['processing']) + logger.info(f"Started processing message {item.message_id}") + except discord.NotFound: + original_message = None + except Exception as e: + logger.error(f"Error fetching original message: {e}") + original_message = None + + # Create and track download task + download_task = asyncio.create_task( + downloader.download_video( + item.url, + progress_callback=lambda progress: self.update_download_progress_reaction(original_message, progress) if original_message else None + ) + ) + + async with self._active_downloads_lock: + self._active_downloads[item.url] = download_task + + try: + success, file_path, error = await download_task + if not success: + if original_message: + await original_message.add_reaction(REACTIONS['error']) + logger.error(f"Download failed for message {item.message_id}: {error}") + return False, f"Failed to download video: {error}" + except asyncio.CancelledError: + logger.info(f"Download cancelled for {item.url}") + return False, "Download cancelled" + except Exception as e: + if original_message: + await original_message.add_reaction(REACTIONS['error']) + logger.error(f"Download error for message {item.message_id}: {str(e)}") + return False, f"Download error: {str(e)}" + finally: + async with self._active_downloads_lock: + self._active_downloads.pop(item.url, None) + + # Get archive channel + guild = self.bot.get_guild(guild_id) + if not guild: + return False, f"Guild {guild_id} not found" + + archive_channel = await self.config.get_channel(guild, "archive") + if not archive_channel: + return False, "Archive channel not configured" + + # Format message + try: + author = original_message.author if original_message else None + message = await message_manager.format_message( + author=author, + channel=channel, + url=item.url + ) + except Exception as e: + return False, f"Failed to format message: {str(e)}" + + # Upload to archive channel + try: + if not os.path.exists(file_path): + return False, "Processed file not found" + + await archive_channel.send( + content=message, + file=discord.File(file_path) + ) + + if original_message: + await original_message.remove_reaction(REACTIONS['processing'], self.bot.user) + await original_message.add_reaction(REACTIONS['success']) + logger.info(f"Successfully processed message {item.message_id}") + + return True, None + + except discord.HTTPException as e: + if original_message: + await original_message.add_reaction(REACTIONS['error']) + logger.error(f"Failed to upload to Discord for message {item.message_id}: {str(e)}") + return False, f"Failed to upload to Discord: {str(e)}" + except Exception as e: + if original_message: + await original_message.add_reaction(REACTIONS['error']) + logger.error(f"Failed to archive video for message {item.message_id}: {str(e)}") + return False, f"Failed to archive video: {str(e)}" + + except Exception as e: + logger.error(f"Error processing video: {traceback.format_exc()}") + return False, str(e) + finally: + # Clean up downloaded file + if file_path and os.path.exists(file_path): + try: + os.unlink(file_path) + except Exception as e: + logger.error(f"Failed to clean up file {file_path}: {e}") + + async def update_queue_position_reaction(self, message, position): + """Update queue position reaction""" + try: + for reaction in REACTIONS['numbers']: + try: + await message.remove_reaction(reaction, self.bot.user) + except: + pass + + if 0 <= position < len(REACTIONS['numbers']): + await message.add_reaction(REACTIONS['numbers'][position]) + logger.info(f"Updated queue position reaction to {position + 1} for message {message.id}") + except Exception as e: + logger.error(f"Failed to update queue position reaction: {e}") + + async def update_progress_reaction(self, message, progress): + """Update progress reaction based on FFmpeg progress""" + try: + for reaction in REACTIONS['progress']: + try: + await message.remove_reaction(reaction, self.bot.user) + except: + pass + + if progress < 33: + await message.add_reaction(REACTIONS['progress'][0]) + elif progress < 66: + await message.add_reaction(REACTIONS['progress'][1]) + else: + await message.add_reaction(REACTIONS['progress'][2]) + except Exception as e: + logger.error(f"Failed to update progress reaction: {e}") + + async def update_download_progress_reaction(self, message, progress): + """Update download progress reaction""" + if not message: + return + + try: + for reaction in REACTIONS['download']: + try: + await message.remove_reaction(reaction, self.bot.user) + except: + pass + + if progress <= 20: + await message.add_reaction(REACTIONS['download'][0]) + elif progress <= 40: + await message.add_reaction(REACTIONS['download'][1]) + elif progress <= 60: + await message.add_reaction(REACTIONS['download'][2]) + elif progress <= 80: + await message.add_reaction(REACTIONS['download'][3]) + elif progress < 100: + await message.add_reaction(REACTIONS['download'][4]) + else: + await message.add_reaction(REACTIONS['download'][5]) + except Exception as e: + logger.error(f"Failed to update download progress reaction: {e}") async def _show_queue_details(self, ctx): """Display detailed queue status and progress information""" @@ -213,286 +524,3 @@ class VideoProcessor: except Exception as e: logger.error(f"Error showing queue details: {traceback.format_exc()}") await ctx.send(f"Error getting queue details: {str(e)}") - - async def update_queue_position_reaction(self, message, position): - """Update queue position reaction""" - try: - # Remove any existing number reactions - for reaction in REACTIONS['numbers']: - try: - await message.remove_reaction(reaction, self.bot.user) - except: - pass - - # Add new position reaction if within range - if 0 <= position < len(REACTIONS['numbers']): - await message.add_reaction(REACTIONS['numbers'][position]) - logger.info(f"Updated queue position reaction to {position + 1} for message {message.id}") - except Exception as e: - logger.error(f"Failed to update queue position reaction: {e}") - - async def update_progress_reaction(self, message, progress): - """Update progress reaction based on FFmpeg progress""" - try: - # Remove existing progress reactions - for reaction in REACTIONS['progress']: - try: - await message.remove_reaction(reaction, self.bot.user) - except: - pass - - # Add appropriate progress reaction - if progress < 33: - await message.add_reaction(REACTIONS['progress'][0]) - logger.info(f"FFmpeg progress 0-33% for message {message.id}") - elif progress < 66: - await message.add_reaction(REACTIONS['progress'][1]) - logger.info(f"FFmpeg progress 33-66% for message {message.id}") - else: - await message.add_reaction(REACTIONS['progress'][2]) - logger.info(f"FFmpeg progress 66-100% for message {message.id}") - except Exception as e: - logger.error(f"Failed to update progress reaction: {e}") - - async def update_download_progress_reaction(self, message, progress): - """Update download progress reaction""" - try: - # Remove existing download progress reactions - for reaction in REACTIONS['download']: - try: - await message.remove_reaction(reaction, self.bot.user) - except: - pass - - # Add appropriate download progress reaction - if progress <= 20: - await message.add_reaction(REACTIONS['download'][0]) - logger.info(f"Download progress 0-20% for message {message.id}") - elif progress <= 40: - await message.add_reaction(REACTIONS['download'][1]) - logger.info(f"Download progress 20-40% for message {message.id}") - elif progress <= 60: - await message.add_reaction(REACTIONS['download'][2]) - logger.info(f"Download progress 40-60% for message {message.id}") - elif progress <= 80: - await message.add_reaction(REACTIONS['download'][3]) - logger.info(f"Download progress 60-80% for message {message.id}") - elif progress < 100: - await message.add_reaction(REACTIONS['download'][4]) - logger.info(f"Download progress 80-100% for message {message.id}") - else: - await message.add_reaction(REACTIONS['download'][5]) - logger.info(f"Download completed (100%) for message {message.id}") - except Exception as e: - logger.error(f"Failed to update download progress reaction: {e}") - - async def process_message(self, message): - """Process a message for video content""" - try: - if not message.guild or not message.guild.id in self.components: - return - - components = self.components[message.guild.id] - downloader = components.get("downloader") - if not downloader: - logger.error(f"No downloader found for guild {message.guild.id}") - return - - # Check if message contains a video URL - content = message.content.strip() - if not content or not downloader.is_supported_url(content): - return - - # Add initial queued reaction - try: - await message.add_reaction(REACTIONS['queued']) - logger.info(f"Added queued reaction to message {message.id}") - except Exception as e: - logger.error(f"Failed to add queued reaction: {e}") - - # Add to processing queue - await self.queue_manager.add_to_queue( - url=content, - message_id=message.id, - channel_id=message.channel.id, - guild_id=message.guild.id, - author_id=message.author.id - ) - logger.info(f"Added message {message.id} to processing queue") - - # Update queue position - queue_status = self.queue_manager.get_queue_status(message.guild.id) - queue_position = queue_status['pending'] - 1 # -1 because this item was just added - await self.update_queue_position_reaction(message, queue_position) - logger.info(f"Message {message.id} is at position {queue_position + 1} in queue") - - except Exception as e: - logger.error(f"Error processing message: {traceback.format_exc()}") - raise ProcessingError(f"Failed to process message: {str(e)}") - - async def _process_video(self, item) -> Tuple[bool, Optional[str]]: - """Process a video from the queue""" - file_path = None - original_message = None - try: - guild_id = item.guild_id - if guild_id not in self.components: - return False, f"No components found for guild {guild_id}" - - components = self.components[guild_id] - downloader = components.get("downloader") - message_manager = components.get("message_manager") - - if not downloader or not message_manager: - return False, f"Missing required components for guild {guild_id}" - - # Get original message - try: - channel = self.bot.get_channel(item.channel_id) - if not channel: - return False, f"Channel {item.channel_id} not found" - original_message = await channel.fetch_message(item.message_id) - - # Update reactions to show processing - await original_message.remove_reaction(REACTIONS['queued'], self.bot.user) - await original_message.add_reaction(REACTIONS['processing']) - logger.info(f"Started processing message {item.message_id}") - except discord.NotFound: - original_message = None - except Exception as e: - logger.error(f"Error fetching original message: {e}") - original_message = None - - # Initialize progress tracking - _download_progress[item.url] = { - 'active': True, - 'start_time': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'), - 'percent': 0, - 'speed': 'N/A', - 'eta': 'N/A', - 'downloaded_bytes': 0, - 'total_bytes': 0, - 'retries': 0 - } - - # Download and process video - try: - success, file_path, error = await downloader.download_video( - item.url, - progress_callback=lambda progress: self.update_download_progress_reaction(original_message, progress) if original_message else None - ) - if not success: - if original_message: - await original_message.add_reaction(REACTIONS['error']) - logger.error(f"Download failed for message {item.message_id}: {error}") - return False, f"Failed to download video: {error}" - except Exception as e: - if original_message: - await original_message.add_reaction(REACTIONS['error']) - logger.error(f"Download error for message {item.message_id}: {str(e)}") - return False, f"Download error: {str(e)}" - finally: - # Clean up progress tracking - if item.url in _download_progress: - _download_progress[item.url]['active'] = False - - # Get archive channel - guild = self.bot.get_guild(guild_id) - if not guild: - return False, f"Guild {guild_id} not found" - - archive_channel = await self.config.get_channel(guild, "archive") - if not archive_channel: - return False, "Archive channel not configured" - - # Format message - try: - author = original_message.author if original_message else None - message = await message_manager.format_message( - author=author, - channel=channel, - url=item.url - ) - except Exception as e: - return False, f"Failed to format message: {str(e)}" - - # Upload to archive channel - try: - if not os.path.exists(file_path): - return False, "Processed file not found" - - await archive_channel.send( - content=message, - file=discord.File(file_path) - ) - - # Update reactions for success - if original_message: - await original_message.remove_reaction(REACTIONS['processing'], self.bot.user) - await original_message.add_reaction(REACTIONS['success']) - logger.info(f"Successfully processed message {item.message_id}") - - return True, None - - except discord.HTTPException as e: - if original_message: - await original_message.add_reaction(REACTIONS['error']) - logger.error(f"Failed to upload to Discord for message {item.message_id}: {str(e)}") - return False, f"Failed to upload to Discord: {str(e)}" - except Exception as e: - if original_message: - await original_message.add_reaction(REACTIONS['error']) - logger.error(f"Failed to archive video for message {item.message_id}: {str(e)}") - return False, f"Failed to archive video: {str(e)}" - - except Exception as e: - if original_message: - await original_message.add_reaction(REACTIONS['error']) - logger.error(f"Error processing video: {traceback.format_exc()}") - return False, str(e) - - finally: - # Clean up downloaded file - if file_path and os.path.exists(file_path): - try: - os.unlink(file_path) - except Exception as e: - logger.error(f"Failed to clean up file {file_path}: {e}") - - async def cleanup(self): - """Clean up resources""" - try: - # Cancel queue processing - if hasattr(self, '_queue_task') and not self._queue_task.done(): - self._queue_task.cancel() - try: - await self._queue_task - except asyncio.CancelledError: - pass - - # Clean up queue manager - if hasattr(self, 'queue_manager'): - await self.queue_manager.cleanup() - - # Clean up failed downloads - async with self._failed_downloads_lock: - for file_path in self._failed_downloads: - try: - if os.path.exists(file_path): - os.unlink(file_path) - except Exception as e: - logger.error(f"Failed to clean up file {file_path}: {e}") - self._failed_downloads.clear() - - # Don't clear global queue manager during cleanup - # This ensures it persists through reloads - - except Exception as e: - logger.error(f"Error during cleanup: {traceback.format_exc()}") - raise ProcessingError(f"Cleanup failed: {str(e)}") - - @classmethod - def get_queue_manager(cls) -> Optional[EnhancedVideoQueueManager]: - """Get the global queue manager instance""" - global _global_queue_manager - return _global_queue_manager diff --git a/videoarchiver/utils/video_downloader.py b/videoarchiver/utils/video_downloader.py index ede8c69..d6f0ff3 100644 --- a/videoarchiver/utils/video_downloader.py +++ b/videoarchiver/utils/video_downloader.py @@ -9,8 +9,9 @@ import yt_dlp import shutil import subprocess import json +import signal from concurrent.futures import ThreadPoolExecutor -from typing import Dict, List, Optional, Tuple, Callable +from typing import Dict, List, Optional, Tuple, Callable, Set from pathlib import Path from datetime import datetime @@ -29,6 +30,25 @@ from videoarchiver.utils.path_manager import temp_path_context logger = logging.getLogger("VideoArchiver") +# Add a custom yt-dlp logger to handle cancellation +class CancellableYTDLLogger: + def __init__(self): + self.cancelled = False + + def debug(self, msg): + if self.cancelled: + raise Exception("Download cancelled") + logger.debug(msg) + + def warning(self, msg): + if self.cancelled: + raise Exception("Download cancelled") + logger.warning(msg) + + def error(self, msg): + if self.cancelled: + raise Exception("Download cancelled") + logger.error(msg) def is_video_url_pattern(url: str) -> bool: """Check if URL matches common video platform patterns""" @@ -53,12 +73,12 @@ def is_video_url_pattern(url: str) -> bool: ] return any(re.search(pattern, url, re.IGNORECASE) for pattern in video_patterns) - class VideoDownloader: - MAX_RETRIES = 5 # Increased from 3 - RETRY_DELAY = 10 # Increased from 5 + MAX_RETRIES = 5 + RETRY_DELAY = 10 FILE_OP_RETRIES = 3 - FILE_OP_RETRY_DELAY = 1 # seconds + FILE_OP_RETRY_DELAY = 1 + SHUTDOWN_TIMEOUT = 15 # seconds def __init__( self, @@ -67,35 +87,36 @@ class VideoDownloader: max_quality: int, max_file_size: int, enabled_sites: Optional[List[str]] = None, - concurrent_downloads: int = 2, # Reduced from 3 + concurrent_downloads: int = 2, ffmpeg_mgr: Optional[FFmpegManager] = None, ): - # Ensure download path exists with proper permissions self.download_path = Path(download_path) self.download_path.mkdir(parents=True, exist_ok=True) os.chmod(str(self.download_path), 0o755) - logger.info(f"Initialized download directory: {self.download_path}") self.video_format = video_format self.max_quality = max_quality self.max_file_size = max_file_size self.enabled_sites = enabled_sites - - # Initialize FFmpeg manager self.ffmpeg_mgr = ffmpeg_mgr or FFmpegManager() - logger.info(f"Using FFmpeg from: {self.ffmpeg_mgr.get_ffmpeg_path()}") - # Create thread pool for this instance + # Create thread pool with proper naming self.download_pool = ThreadPoolExecutor( max_workers=max(1, min(3, concurrent_downloads)), thread_name_prefix="videoarchiver_download", ) - # Track active downloads for cleanup - self.active_downloads: Dict[str, str] = {} + # Track active downloads and processes + self.active_downloads: Dict[str, Dict[str, Any]] = {} self._downloads_lock = asyncio.Lock() + self._active_processes: Set[subprocess.Popen] = set() + self._processes_lock = asyncio.Lock() + self._shutting_down = False - # Configure yt-dlp options with improved settings + # Create cancellable logger + self.ytdl_logger = CancellableYTDLLogger() + + # Configure yt-dlp options self.ydl_opts = { "format": f"bv*[height<={max_quality}][ext=mp4]+ba[ext=m4a]/b[height<={max_quality}]/best", "outtmpl": "%(title)s.%(ext)s", @@ -103,30 +124,87 @@ class VideoDownloader: "quiet": True, "no_warnings": True, "extract_flat": True, - "concurrent_fragment_downloads": 1, # Reduced from default + "concurrent_fragment_downloads": 1, "retries": self.MAX_RETRIES, "fragment_retries": self.MAX_RETRIES, "file_access_retries": self.FILE_OP_RETRIES, "extractor_retries": self.MAX_RETRIES, "postprocessor_hooks": [self._check_file_size], - "progress_hooks": [self._progress_hook, self._detailed_progress_hook], # Add detailed hook + "progress_hooks": [self._progress_hook, self._detailed_progress_hook], "ffmpeg_location": str(self.ffmpeg_mgr.get_ffmpeg_path()), "ffprobe_location": str(self.ffmpeg_mgr.get_ffprobe_path()), "paths": {"home": str(self.download_path)}, - "logger": logger, + "logger": self.ytdl_logger, "ignoreerrors": True, "no_color": True, "geo_bypass": True, - "socket_timeout": 60, # Increased from 30 - "http_chunk_size": 1048576, # Reduced to 1MB chunks for better stability + "socket_timeout": 60, + "http_chunk_size": 1048576, "external_downloader_args": { - "ffmpeg": ["-timeout", "60000000"] # Increased to 60 seconds + "ffmpeg": ["-timeout", "60000000"] }, - "max_sleep_interval": 5, # Maximum time to sleep between retries - "sleep_interval": 1, # Initial sleep interval - "max_filesize": max_file_size * 1024 * 1024, # Set max file size limit + "max_sleep_interval": 5, + "sleep_interval": 1, + "max_filesize": max_file_size * 1024 * 1024, } + async def cleanup(self) -> None: + """Clean up resources with proper shutdown""" + self._shutting_down = True + + try: + # Cancel active downloads + self.ytdl_logger.cancelled = True + + # Kill any active FFmpeg processes + async with self._processes_lock: + for process in self._active_processes: + try: + process.terminate() + await asyncio.sleep(0.1) # Give process time to terminate + if process.poll() is None: + process.kill() # Force kill if still running + except Exception as e: + logger.error(f"Error killing process: {e}") + self._active_processes.clear() + + # Clean up thread pool + self.download_pool.shutdown(wait=False, cancel_futures=True) + + # Clean up active downloads + async with self._downloads_lock: + self.active_downloads.clear() + + except Exception as e: + logger.error(f"Error during downloader cleanup: {e}") + finally: + self._shutting_down = False + + async def force_cleanup(self) -> None: + """Force cleanup of all resources""" + try: + # Force cancel all downloads + self.ytdl_logger.cancelled = True + + # Kill all processes immediately + async with self._processes_lock: + for process in self._active_processes: + try: + process.kill() + except Exception as e: + logger.error(f"Error force killing process: {e}") + self._active_processes.clear() + + # Force shutdown thread pool + self.download_pool.shutdown(wait=False, cancel_futures=True) + + # Clear all tracking + async with self._downloads_lock: + self.active_downloads.clear() + + except Exception as e: + logger.error(f"Error during force cleanup: {e}") + def _detailed_progress_hook(self, d): """Handle detailed download progress tracking""" try: @@ -233,6 +311,9 @@ class VideoDownloader: self, url: str, progress_callback: Optional[Callable[[float], None]] = None ) -> Tuple[bool, str, str]: """Download and process a video with improved error handling""" + if self._shutting_down: + return False, "", "Downloader is shutting down" + # Initialize progress tracking for this URL from videoarchiver.processor import _download_progress _download_progress[url] = { @@ -272,7 +353,10 @@ class VideoDownloader: original_file = file_path async with self._downloads_lock: - self.active_downloads[url] = original_file + self.active_downloads[url] = { + 'file_path': original_file, + 'start_time': datetime.utcnow() + } # Check file size and compress if needed file_size = os.path.getsize(original_file) @@ -386,6 +470,9 @@ class VideoDownloader: use_hardware: bool = True, ) -> bool: """Attempt video compression with given parameters""" + if self._shutting_down: + return False + try: # Build FFmpeg command ffmpeg_path = str(self.ffmpeg_mgr.get_ffmpeg_path()) @@ -448,50 +535,64 @@ class VideoDownloader: *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) + # Track the process + async with self._processes_lock: + self._active_processes.add(process) + start_time = datetime.utcnow() - while True: - line = await process.stdout.readline() - if not line: - break + try: + while True: + if self._shutting_down: + process.terminate() + return False - try: - line = line.decode().strip() - if line.startswith("out_time_ms="): - current_time = ( - int(line.split("=")[1]) / 1000000 - ) # Convert microseconds to seconds - if duration > 0: - progress = min(100, (current_time / duration) * 100) - - # Update compression progress - elapsed = datetime.utcnow() - start_time - _compression_progress[input_file].update({ - 'percent': progress, - 'elapsed_time': str(elapsed).split('.')[0], # Remove microseconds - 'current_size': os.path.getsize(output_file) if os.path.exists(output_file) else 0, - 'current_time': current_time, - 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') - }) - - if progress_callback: - await progress_callback(progress) - except Exception as e: - logger.error(f"Error parsing FFmpeg progress: {e}") + line = await process.stdout.readline() + if not line: + break - await process.wait() - success = os.path.exists(output_file) - - # Update final status - if success and input_file in _compression_progress: - _compression_progress[input_file].update({ - 'active': False, - 'percent': 100, - 'current_size': os.path.getsize(output_file), - 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') - }) - - return success + try: + line = line.decode().strip() + if line.startswith("out_time_ms="): + current_time = ( + int(line.split("=")[1]) / 1000000 + ) # Convert microseconds to seconds + if duration > 0: + progress = min(100, (current_time / duration) * 100) + + # Update compression progress + elapsed = datetime.utcnow() - start_time + _compression_progress[input_file].update({ + 'percent': progress, + 'elapsed_time': str(elapsed).split('.')[0], # Remove microseconds + 'current_size': os.path.getsize(output_file) if os.path.exists(output_file) else 0, + 'current_time': current_time, + 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + }) + + if progress_callback: + await progress_callback(progress) + except Exception as e: + logger.error(f"Error parsing FFmpeg progress: {e}") + + await process.wait() + success = os.path.exists(output_file) + + # Update final status + if success and input_file in _compression_progress: + _compression_progress[input_file].update({ + 'active': False, + 'percent': 100, + 'current_size': os.path.getsize(output_file), + 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + }) + + return success + + finally: + # Remove process from tracking + async with self._processes_lock: + self._active_processes.discard(process) except subprocess.CalledProcessError as e: logger.error(f"FFmpeg compression failed: {e.stderr.decode()}") @@ -593,6 +694,9 @@ class VideoDownloader: progress_callback: Optional[Callable[[float], None]] = None, ) -> Tuple[bool, str, str]: """Safely download video with retries""" + if self._shutting_down: + return False, "", "Downloader is shutting down" + last_error = None for attempt in range(self.MAX_RETRIES): try: diff --git a/videoarchiver/video_archiver.py b/videoarchiver/video_archiver.py index 37a1788..62b009d 100644 --- a/videoarchiver/video_archiver.py +++ b/videoarchiver/video_archiver.py @@ -27,9 +27,11 @@ from videoarchiver.utils.exceptions import ( FileCleanupError as FileOperationError ) - logger = logging.getLogger("VideoArchiver") +# Constants for timeouts +UNLOAD_TIMEOUT = 30 # seconds +CLEANUP_TIMEOUT = 15 # seconds class VideoArchiver(commands.Cog): """Archive videos from Discord channels""" @@ -40,6 +42,7 @@ class VideoArchiver(commands.Cog): self.ready = asyncio.Event() self._init_task: Optional[asyncio.Task] = None self._cleanup_task: Optional[asyncio.Task] = None + self._unloading = False # Start initialization self._init_task = asyncio.create_task(self._initialize()) @@ -73,7 +76,6 @@ class VideoArchiver(commands.Cog): await self.initialize_guild_components(guild.id) except Exception as e: logger.error(f"Failed to initialize guild {guild.id}: {str(e)}") - # Continue initialization even if one guild fails continue # Initialize queue manager after components are ready @@ -97,7 +99,7 @@ class VideoArchiver(commands.Cog): self.config_manager, self.components, queue_manager=self.queue_manager, - ffmpeg_mgr=self.ffmpeg_mgr, # Pass shared FFmpeg manager + ffmpeg_mgr=self.ffmpeg_mgr, ) # Start update checker @@ -109,10 +111,7 @@ class VideoArchiver(commands.Cog): logger.info("VideoArchiver initialization completed successfully") except Exception as e: - logger.error( - f"Critical error during initialization: {traceback.format_exc()}" - ) - # Clean up any partially initialized components + logger.error(f"Critical error during initialization: {traceback.format_exc()}") await self._cleanup() raise @@ -129,9 +128,7 @@ class VideoArchiver(commands.Cog): async def cog_load(self) -> None: """Handle cog loading""" try: - # Wait for initialization to complete await asyncio.wait_for(self.ready.wait(), timeout=30) - except asyncio.TimeoutError: await self._cleanup() raise ProcessingError("Cog initialization timed out") @@ -140,31 +137,82 @@ class VideoArchiver(commands.Cog): raise ProcessingError(f"Error during cog load: {str(e)}") async def cog_unload(self) -> None: - """Clean up when cog is unloaded""" - await self._cleanup() + """Clean up when cog is unloaded with timeout""" + self._unloading = True + try: + # Create cleanup task with timeout + cleanup_task = asyncio.create_task(self._cleanup()) + try: + await asyncio.wait_for(cleanup_task, timeout=UNLOAD_TIMEOUT) + except asyncio.TimeoutError: + logger.error("Cog unload timed out, forcing cleanup") + # Force cleanup of any remaining resources + await self._force_cleanup() + except Exception as e: + logger.error(f"Error during cog unload: {str(e)}") + await self._force_cleanup() + finally: + self._unloading = False + + async def _force_cleanup(self) -> None: + """Force cleanup of resources when timeout occurs""" + try: + # Cancel all tasks + if hasattr(self, "processor"): + await self.processor.force_cleanup() + + # Force stop queue manager + if hasattr(self, "queue_manager"): + self.queue_manager.force_stop() + + # Kill any remaining FFmpeg processes + if hasattr(self, "ffmpeg_mgr"): + self.ffmpeg_mgr.kill_all_processes() + + # Clean up download directory + if hasattr(self, "download_path") and self.download_path.exists(): + try: + await cleanup_downloads(str(self.download_path)) + self.download_path.rmdir() + except Exception as e: + logger.error(f"Error force cleaning download directory: {str(e)}") + + except Exception as e: + logger.error(f"Error during force cleanup: {str(e)}") + finally: + self.ready.clear() async def _cleanup(self) -> None: - """Clean up all resources""" + """Clean up all resources with proper handling""" try: # Cancel initialization if still running if self._init_task and not self._init_task.done(): self._init_task.cancel() try: - await self._init_task - except asyncio.CancelledError: + await asyncio.wait_for(self._init_task, timeout=CLEANUP_TIMEOUT) + except (asyncio.TimeoutError, asyncio.CancelledError): pass # Stop update checker if hasattr(self, "update_checker"): - await self.update_checker.stop() + try: + await asyncio.wait_for(self.update_checker.stop(), timeout=CLEANUP_TIMEOUT) + except asyncio.TimeoutError: + pass # Clean up processor if hasattr(self, "processor"): - await self.processor.cleanup() + try: + await asyncio.wait_for(self.processor.cleanup(), timeout=CLEANUP_TIMEOUT) + except asyncio.TimeoutError: + await self.processor.force_cleanup() # Clean up queue manager if hasattr(self, "queue_manager"): - await self.queue_manager.cleanup() + try: + await asyncio.wait_for(self.queue_manager.cleanup(), timeout=CLEANUP_TIMEOUT) + except asyncio.TimeoutError: + self.queue_manager.force_stop() # Clean up components for each guild if hasattr(self, "components"): @@ -191,8 +239,8 @@ class VideoArchiver(commands.Cog): except Exception as e: logger.error(f"Error during cleanup: {traceback.format_exc()}") + raise ProcessingError(f"Cleanup failed: {str(e)}") finally: - # Clear ready flag self.ready.clear() async def initialize_guild_components(self, guild_id: int) -> None: