From 32c63deeff2b0ae417898d334e6280ba9cc076d7 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Sat, 16 Nov 2024 00:24:28 +0000 Subject: [PATCH] Removed the problematic singleton pattern from queue manager Added proper activity tracking in the monitoring system Reduced timeouts and deadlock thresholds Implemented more aggressive cleanup procedures Added system-wide FFmpeg process cleanup --- videoarchiver/core/base.py | 49 ++++++------ videoarchiver/core/cleanup.py | 108 +++++++++++++++++++++---- videoarchiver/queue/manager.py | 126 ++++++++++++++---------------- videoarchiver/queue/monitoring.py | 44 +++++++---- 4 files changed, 208 insertions(+), 119 deletions(-) diff --git a/videoarchiver/core/base.py b/videoarchiver/core/base.py index 0a0c1eb..c0b6e12 100644 --- a/videoarchiver/core/base.py +++ b/videoarchiver/core/base.py @@ -7,11 +7,11 @@ import traceback from redbot.core import Config, data_manager from redbot.core.bot import Red from redbot.core.commands import ( - GroupCog, - Context, - hybrid_command, + GroupCog, + Context, + hybrid_command, hybrid_group, - guild_only + guild_only, ) from redbot.core import checks from discord import app_commands @@ -43,9 +43,10 @@ logger = logging.getLogger("VideoArchiver") # Constants for timeouts - more reasonable timeouts UNLOAD_TIMEOUT = 30 # seconds CLEANUP_TIMEOUT = 15 # seconds -INIT_TIMEOUT = 60 # seconds +INIT_TIMEOUT = 60 # seconds COMPONENT_INIT_TIMEOUT = 30 # seconds + class VideoArchiver(GroupCog): """Archive videos from Discord channels""" @@ -135,7 +136,9 @@ class VideoArchiver(GroupCog): self.processor.db = None self.processor.queue_handler.db = None - await self.config_manager.update_setting(ctx.guild.id, "use_database", False) + await self.config_manager.update_setting( + ctx.guild.id, "use_database", False + ) await ctx.send("Video archive database has been disabled.") except Exception as e: @@ -364,8 +367,7 @@ class VideoArchiver(GroupCog): # Clean existing downloads with timeout try: await asyncio.wait_for( - cleanup_downloads(str(self.download_path)), - timeout=CLEANUP_TIMEOUT + cleanup_downloads(str(self.download_path)), timeout=CLEANUP_TIMEOUT ) logger.info("Downloads cleaned up") except asyncio.TimeoutError: @@ -386,12 +388,11 @@ class VideoArchiver(GroupCog): max_history_age=86400, persistence_path=str(queue_path), ) - + # Initialize queue manager with timeout try: await asyncio.wait_for( - self.queue_manager.initialize(), - timeout=INIT_TIMEOUT + self.queue_manager.initialize(), timeout=INIT_TIMEOUT ) logger.info("Queue manager initialized successfully") except asyncio.TimeoutError: @@ -417,7 +418,7 @@ class VideoArchiver(GroupCog): try: await asyncio.wait_for( initialize_guild_components(self, guild.id), - timeout=COMPONENT_INIT_TIMEOUT + timeout=COMPONENT_INIT_TIMEOUT, ) logger.info(f"Guild {guild.id} components initialized") except asyncio.TimeoutError: @@ -434,8 +435,7 @@ class VideoArchiver(GroupCog): # Start update checker with timeout try: await asyncio.wait_for( - self.update_checker.start(), - timeout=INIT_TIMEOUT + self.update_checker.start(), timeout=INIT_TIMEOUT ) logger.info("Update checker started") except asyncio.TimeoutError: @@ -453,12 +453,13 @@ class VideoArchiver(GroupCog): logger.info("VideoArchiver initialization completed successfully") except Exception as e: - logger.error(f"Critical error during initialization: {str(e)}\n{traceback.format_exc()}") + logger.error( + f"Critical error during initialization: {str(e)}\n{traceback.format_exc()}" + ) # Force cleanup on initialization error try: await asyncio.wait_for( - force_cleanup_resources(self), - timeout=CLEANUP_TIMEOUT + force_cleanup_resources(self), timeout=CLEANUP_TIMEOUT ) except asyncio.TimeoutError: logger.error("Force cleanup during initialization timed out") @@ -491,8 +492,7 @@ class VideoArchiver(GroupCog): # Ensure cleanup on any error try: await asyncio.wait_for( - force_cleanup_resources(self), - timeout=CLEANUP_TIMEOUT + force_cleanup_resources(self), timeout=CLEANUP_TIMEOUT ) except asyncio.TimeoutError: logger.error("Force cleanup during load error timed out") @@ -510,7 +510,11 @@ class VideoArchiver(GroupCog): self._cleanup_task.cancel() # Cancel queue processing task if it exists - if hasattr(self, '_queue_task') and self._queue_task and not self._queue_task.done(): + if ( + hasattr(self, "_queue_task") + and self._queue_task + and not self._queue_task.done() + ): self._queue_task.cancel() try: await self._queue_task @@ -535,8 +539,7 @@ class VideoArchiver(GroupCog): try: # Force cleanup with timeout await asyncio.wait_for( - force_cleanup_resources(self), - timeout=CLEANUP_TIMEOUT + force_cleanup_resources(self), timeout=CLEANUP_TIMEOUT ) logger.info("Force cleanup completed") except asyncio.TimeoutError: @@ -560,7 +563,7 @@ class VideoArchiver(GroupCog): self.db = None self._init_task = None self._cleanup_task = None - if hasattr(self, '_queue_task'): + if hasattr(self, "_queue_task"): self._queue_task = None async def _cleanup(self) -> None: diff --git a/videoarchiver/core/cleanup.py b/videoarchiver/core/cleanup.py index 088e1c0..1aa1404 100644 --- a/videoarchiver/core/cleanup.py +++ b/videoarchiver/core/cleanup.py @@ -2,7 +2,10 @@ import logging import asyncio +import signal +import os from typing import TYPE_CHECKING +from pathlib import Path from ..utils.file_ops import cleanup_downloads @@ -11,48 +14,61 @@ if TYPE_CHECKING: logger = logging.getLogger("VideoArchiver") -CLEANUP_TIMEOUT = 15 # seconds +CLEANUP_TIMEOUT = 5 # Reduced timeout to 5 seconds +FORCE_CLEANUP_TIMEOUT = 3 # Even shorter timeout for force cleanup async def cleanup_resources(cog: "VideoArchiver") -> None: """Clean up all resources with proper handling""" try: + logger.info("Starting resource cleanup...") + # Cancel initialization if still running if cog._init_task and not cog._init_task.done(): + logger.info("Cancelling initialization task") cog._init_task.cancel() try: await asyncio.wait_for(cog._init_task, timeout=CLEANUP_TIMEOUT) except (asyncio.TimeoutError, asyncio.CancelledError): - pass + logger.warning("Initialization task cancellation timed out") # Stop update checker - if hasattr(cog, "update_checker"): + if hasattr(cog, "update_checker") and cog.update_checker: + logger.info("Stopping update checker") try: await asyncio.wait_for( cog.update_checker.stop(), timeout=CLEANUP_TIMEOUT ) except asyncio.TimeoutError: - pass + logger.warning("Update checker stop timed out") + cog.update_checker = None # Clean up processor - if hasattr(cog, "processor"): + if hasattr(cog, "processor") and cog.processor: + logger.info("Cleaning up processor") try: await asyncio.wait_for( cog.processor.cleanup(), timeout=CLEANUP_TIMEOUT ) except asyncio.TimeoutError: + logger.warning("Processor cleanup timed out, forcing cleanup") await cog.processor.force_cleanup() + cog.processor = None # Clean up queue manager - if hasattr(cog, "queue_manager"): + if hasattr(cog, "queue_manager") and cog.queue_manager: + logger.info("Cleaning up queue manager") try: await asyncio.wait_for( cog.queue_manager.cleanup(), timeout=CLEANUP_TIMEOUT ) except asyncio.TimeoutError: + logger.warning("Queue manager cleanup timed out, forcing stop") cog.queue_manager.force_stop() + cog.queue_manager = None # Clean up components for each guild if hasattr(cog, "components"): + logger.info("Cleaning up guild components") for guild_id, components in cog.components.items(): try: if "message_manager" in components: @@ -66,44 +82,106 @@ async def cleanup_resources(cog: "VideoArchiver") -> None: cog.components.clear() + # Kill any FFmpeg processes + if hasattr(cog, "ffmpeg_mgr") and cog.ffmpeg_mgr: + logger.info("Killing FFmpeg processes") + cog.ffmpeg_mgr.kill_all_processes() + cog.ffmpeg_mgr = None + # Clean up download directory if hasattr(cog, "download_path") and cog.download_path.exists(): + logger.info("Cleaning up download directory") try: - await cleanup_downloads(str(cog.download_path)) - cog.download_path.rmdir() + await asyncio.wait_for( + cleanup_downloads(str(cog.download_path)), + timeout=CLEANUP_TIMEOUT + ) + if cog.download_path.exists(): + cog.download_path.rmdir() except Exception as e: logger.error(f"Error cleaning up download directory: {str(e)}") + # Kill any remaining FFmpeg processes system-wide + try: + if os.name != 'nt': # Unix-like systems + os.system("pkill -9 ffmpeg") + else: # Windows + os.system("taskkill /F /IM ffmpeg.exe") + except Exception as e: + logger.error(f"Error killing FFmpeg processes: {str(e)}") + except Exception as e: logger.error(f"Error during cleanup: {str(e)}") raise finally: + logger.info("Clearing ready flag") cog.ready.clear() async def force_cleanup_resources(cog: "VideoArchiver") -> None: """Force cleanup of resources when timeout occurs""" try: - # Cancel all tasks - if hasattr(cog, "processor"): + logger.info("Starting force cleanup...") + + # Cancel all tasks immediately + if hasattr(cog, "processor") and cog.processor: + logger.info("Force cleaning processor") await cog.processor.force_cleanup() + cog.processor = None # Force stop queue manager - if hasattr(cog, "queue_manager"): + if hasattr(cog, "queue_manager") and cog.queue_manager: + logger.info("Force stopping queue manager") cog.queue_manager.force_stop() + cog.queue_manager = None - # Kill any remaining FFmpeg processes - if hasattr(cog, "ffmpeg_mgr"): + # Kill FFmpeg processes + if hasattr(cog, "ffmpeg_mgr") and cog.ffmpeg_mgr: + logger.info("Force killing FFmpeg processes") cog.ffmpeg_mgr.kill_all_processes() + cog.ffmpeg_mgr = None + + # Force kill any remaining FFmpeg processes system-wide + try: + if os.name != 'nt': # Unix-like systems + os.system("pkill -9 ffmpeg") + else: # Windows + os.system("taskkill /F /IM ffmpeg.exe") + except Exception as e: + logger.error(f"Error force killing FFmpeg processes: {str(e)}") # Clean up download directory if hasattr(cog, "download_path") and cog.download_path.exists(): + logger.info("Force cleaning download directory") try: - await cleanup_downloads(str(cog.download_path)) - cog.download_path.rmdir() + await asyncio.wait_for( + cleanup_downloads(str(cog.download_path)), + timeout=FORCE_CLEANUP_TIMEOUT + ) + if cog.download_path.exists(): + cog.download_path.rmdir() except Exception as e: logger.error(f"Error force cleaning download directory: {str(e)}") + # Clear all components + if hasattr(cog, "components"): + logger.info("Force clearing components") + cog.components.clear() + except Exception as e: logger.error(f"Error during force cleanup: {str(e)}") finally: + logger.info("Clearing ready flag") cog.ready.clear() + + # Clear all references + cog.bot = None + cog.processor = None + cog.queue_manager = None + cog.update_checker = None + cog.ffmpeg_mgr = None + cog.components = {} + cog.db = None + cog._init_task = None + cog._cleanup_task = None + if hasattr(cog, '_queue_task'): + cog._queue_task = None diff --git a/videoarchiver/queue/manager.py b/videoarchiver/queue/manager.py index 1dcd14c..cd21a6e 100644 --- a/videoarchiver/queue/manager.py +++ b/videoarchiver/queue/manager.py @@ -20,17 +20,6 @@ logger = logging.getLogger("QueueManager") class EnhancedVideoQueueManager: """Enhanced queue manager with improved memory management and performance""" - # Class-level initialization lock to prevent multiple instances - _instance_lock = asyncio.Lock() - _instance = None - _initialized = False - - def __new__(cls, *args, **kwargs): - """Ensure singleton instance""" - if not cls._instance: - cls._instance = super().__new__(cls) - return cls._instance - def __init__( self, max_retries: int = 3, @@ -43,10 +32,7 @@ class EnhancedVideoQueueManager: deadlock_threshold: int = 300, # 5 minutes check_interval: int = 60, # 1 minute ): - """Initialize only once""" - if self._initialized: - return - + """Initialize queue manager""" # Configuration self.max_retries = max_retries self.retry_delay = retry_delay @@ -70,7 +56,8 @@ class EnhancedVideoQueueManager: # State self._shutdown = False - self._init_event = asyncio.Event() # Single event for initialization state + self._initialized = False + self._init_event = asyncio.Event() self.metrics = QueueMetrics() # Components @@ -85,64 +72,59 @@ class EnhancedVideoQueueManager: max_history_age=max_history_age ) - # Mark instance as initialized - self._initialized = True - async def initialize(self) -> None: """Initialize the queue manager components sequentially""" - # Use class-level lock to prevent multiple initializations - async with self._instance_lock: - # Check if already initialized - if self._init_event.is_set(): - logger.info("Queue manager already initialized") - return + if self._initialized: + logger.info("Queue manager already initialized") + return - try: - logger.info("Starting queue manager initialization...") - - # Load persisted state first if available - if self.persistence: - await self._load_persisted_state() - - # Start monitoring task - monitor_task = asyncio.create_task( - self.monitor.start_monitoring( - self._queue, - self._processing, - self.metrics, - self._processing_lock - ) + try: + logger.info("Starting queue manager initialization...") + + # Load persisted state first if available + if self.persistence: + await self._load_persisted_state() + + # Start monitoring task + monitor_task = asyncio.create_task( + self.monitor.start_monitoring( + self._queue, + self._processing, + self.metrics, + self._processing_lock ) - self._active_tasks.add(monitor_task) - logger.info("Queue monitoring started") + ) + self._active_tasks.add(monitor_task) + logger.info("Queue monitoring started") - # Brief pause to allow monitor to initialize - await asyncio.sleep(0.1) - - # Start cleanup task - cleanup_task = asyncio.create_task( - self.cleaner.start_cleanup( - self._queue, - self._completed, - self._failed, - self._guild_queues, - self._channel_queues, - self._processing, - self.metrics, - self._queue_lock - ) + # Brief pause to allow monitor to initialize + await asyncio.sleep(0.1) + + # Start cleanup task + cleanup_task = asyncio.create_task( + self.cleaner.start_cleanup( + self._queue, + self._completed, + self._failed, + self._guild_queues, + self._channel_queues, + self._processing, + self.metrics, + self._queue_lock ) - self._active_tasks.add(cleanup_task) - logger.info("Queue cleanup started") + ) + self._active_tasks.add(cleanup_task) + logger.info("Queue cleanup started") - # Signal initialization complete - self._init_event.set() - logger.info("Queue manager initialization completed") + # Signal initialization complete + self._initialized = True + self._init_event.set() + logger.info("Queue manager initialization completed") - except Exception as e: - logger.error(f"Failed to initialize queue manager: {e}") - self._shutdown = True - raise + except Exception as e: + logger.error(f"Failed to initialize queue manager: {e}") + self._shutdown = True + raise async def _load_persisted_state(self) -> None: """Load persisted queue state""" @@ -199,6 +181,8 @@ class EnhancedVideoQueueManager: async with self._processing_lock: for item in items: self._processing[item.url] = item + # Update activity timestamp + self.monitor.update_activity() if not items: await asyncio.sleep(0.1) @@ -234,6 +218,7 @@ class EnhancedVideoQueueManager: logger.info(f"Processing queue item: {item.url}") item.start_processing() self.metrics.last_activity_time = time.time() + self.monitor.update_activity() # Update activity timestamp success, error = await processor(item) @@ -338,6 +323,7 @@ class EnhancedVideoQueueManager: self._queue.sort(key=lambda x: (-x.priority, x.added_at)) self.metrics.last_activity_time = time.time() + self.monitor.update_activity() # Update activity timestamp if self.persistence: await self._persist_state() @@ -439,6 +425,9 @@ class EnhancedVideoQueueManager: self._channel_queues.clear() self._active_tasks.clear() + # Reset initialization state + self._initialized = False + self._init_event.clear() logger.info("Queue manager cleanup completed") except Exception as e: @@ -470,5 +459,8 @@ class EnhancedVideoQueueManager: self._processing.clear() self._active_tasks.clear() - + + # Reset initialization state + self._initialized = False + self._init_event.clear() logger.info("Queue manager force stopped") diff --git a/videoarchiver/queue/monitoring.py b/videoarchiver/queue/monitoring.py index c67fdbe..2e57474 100644 --- a/videoarchiver/queue/monitoring.py +++ b/videoarchiver/queue/monitoring.py @@ -19,10 +19,10 @@ class QueueMonitor: def __init__( self, - deadlock_threshold: int = 300, # 5 minutes + deadlock_threshold: int = 120, # Reduced to 2 minutes memory_threshold: int = 512, # 512MB max_retries: int = 3, - check_interval: int = 60 # Check every minute + check_interval: int = 30 # Reduced to 30 seconds ): self.deadlock_threshold = deadlock_threshold self.memory_threshold = memory_threshold @@ -30,6 +30,7 @@ class QueueMonitor: self.check_interval = check_interval self._shutdown = False self._last_active_time = time.time() + self._monitoring_task = None async def start_monitoring( self, @@ -46,23 +47,41 @@ class QueueMonitor: metrics: Reference to queue metrics processing_lock: Lock for processing dict """ + if self._monitoring_task is not None: + logger.warning("Monitoring task already running") + return + logger.info("Starting queue monitoring...") + self._monitoring_task = asyncio.create_task( + self._monitor_loop(queue, processing, metrics, processing_lock) + ) + + async def _monitor_loop( + self, + queue: List[QueueItem], + processing: Dict[str, QueueItem], + metrics: QueueMetrics, + processing_lock: asyncio.Lock + ) -> None: + """Main monitoring loop""" while not self._shutdown: try: await self._check_health(queue, processing, metrics, processing_lock) await asyncio.sleep(self.check_interval) - except asyncio.CancelledError: logger.info("Queue monitoring cancelled") break except Exception as e: logger.error(f"Error in health monitor: {str(e)}") - await asyncio.sleep(30) # Shorter sleep on error + await asyncio.sleep(5) # Short sleep on error def stop_monitoring(self) -> None: """Stop the monitoring process""" logger.info("Stopping queue monitoring...") self._shutdown = True + if self._monitoring_task: + self._monitoring_task.cancel() + self._monitoring_task = None def update_activity(self) -> None: """Update the last active time""" @@ -104,7 +123,6 @@ class QueueMonitor: async with processing_lock: for url, item in processing.items(): - # Check if item has started processing if hasattr(item, 'start_time') and item.start_time: processing_time = current_time - item.start_time processing_times.append(processing_time) @@ -131,22 +149,18 @@ class QueueMonitor: ) self._last_active_time = current_time - # Calculate and log metrics - success_rate = metrics.success_rate - error_distribution = metrics.errors_by_type - avg_processing_time = metrics.avg_processing_time - - # Update peak memory usage + # Update metrics + metrics.last_activity_time = self._last_active_time metrics.peak_memory_usage = max(metrics.peak_memory_usage, memory_usage) # Log detailed metrics logger.info( f"Queue Health Metrics:\n" - f"- Success Rate: {success_rate:.2%}\n" - f"- Avg Processing Time: {avg_processing_time:.2f}s\n" + f"- Success Rate: {metrics.success_rate:.2%}\n" + f"- Avg Processing Time: {metrics.avg_processing_time:.2f}s\n" f"- Memory Usage: {memory_usage:.2f}MB\n" f"- Peak Memory: {metrics.peak_memory_usage:.2f}MB\n" - f"- Error Distribution: {error_distribution}\n" + f"- Error Distribution: {metrics.errors_by_type}\n" f"- Queue Size: {len(queue)}\n" f"- Processing Items: {len(processing)}\n" f"- Last Activity: {(current_time - self._last_active_time):.1f}s ago" @@ -202,6 +216,8 @@ class QueueMonitor: except Exception as e: logger.error(f"Error recovering item {url}: {str(e)}") + # Update activity timestamp after recovery + self.update_activity() logger.info(f"Recovery complete - Recovered: {recovered}, Failed: {failed}") except Exception as e: