diff --git a/videoarchiver/__init__.py b/videoarchiver/__init__.py index 9c230c1..e991a9f 100644 --- a/videoarchiver/__init__.py +++ b/videoarchiver/__init__.py @@ -7,29 +7,51 @@ from .exceptions import ProcessingError logger = logging.getLogger("VideoArchiver") +# Global lock to prevent multiple concurrent setup attempts +_setup_lock = asyncio.Lock() +_setup_in_progress = False + async def setup(bot: Red) -> None: """Load VideoArchiver.""" - try: - # Load main cog - cog = VideoArchiver(bot) - await bot.add_cog(cog) - - # Wait for initialization to complete with timeout + global _setup_in_progress + + # Use lock to prevent multiple concurrent setup attempts + async with _setup_lock: try: - await asyncio.wait_for(cog.ready.wait(), timeout=30) - except asyncio.TimeoutError: - logger.error("VideoArchiver initialization timed out") - await bot.remove_cog(cog.__class__.__name__) - raise ProcessingError("Initialization timed out") - - if not cog.ready.is_set(): - logger.error("VideoArchiver failed to initialize") - await bot.remove_cog(cog.__class__.__name__) - raise ProcessingError("Initialization failed") + # Check if setup is already in progress + if _setup_in_progress: + logger.warning("VideoArchiver setup already in progress, skipping") + return + + # Check if cog is already loaded + if "VideoArchiver" in bot.cogs: + logger.warning("VideoArchiver already loaded, skipping") + return + + _setup_in_progress = True - except Exception as e: - logger.error(f"Failed to load VideoArchiver: {str(e)}") - raise + # Load main cog + cog = VideoArchiver(bot) + await bot.add_cog(cog) + + # Wait for initialization to complete with timeout + try: + await asyncio.wait_for(cog.ready.wait(), timeout=30) + except asyncio.TimeoutError: + logger.error("VideoArchiver initialization timed out") + await bot.remove_cog(cog.__class__.__name__) + raise ProcessingError("Initialization timed out") + + if not cog.ready.is_set(): + logger.error("VideoArchiver failed to initialize") + await bot.remove_cog(cog.__class__.__name__) + raise ProcessingError("Initialization failed") + + except Exception as e: + logger.error(f"Failed to load VideoArchiver: {str(e)}") + raise + finally: + _setup_in_progress = False async def teardown(bot: Red) -> None: """Clean up when unloading.""" diff --git a/videoarchiver/queue/manager.py b/videoarchiver/queue/manager.py index 7b9eb5b..1dcd14c 100644 --- a/videoarchiver/queue/manager.py +++ b/videoarchiver/queue/manager.py @@ -20,6 +20,17 @@ logger = logging.getLogger("QueueManager") class EnhancedVideoQueueManager: """Enhanced queue manager with improved memory management and performance""" + # Class-level initialization lock to prevent multiple instances + _instance_lock = asyncio.Lock() + _instance = None + _initialized = False + + def __new__(cls, *args, **kwargs): + """Ensure singleton instance""" + if not cls._instance: + cls._instance = super().__new__(cls) + return cls._instance + def __init__( self, max_retries: int = 3, @@ -32,6 +43,10 @@ class EnhancedVideoQueueManager: deadlock_threshold: int = 300, # 5 minutes check_interval: int = 60, # 1 minute ): + """Initialize only once""" + if self._initialized: + return + # Configuration self.max_retries = max_retries self.retry_delay = retry_delay @@ -48,14 +63,14 @@ class EnhancedVideoQueueManager: self._channel_queues: Dict[int, Set[str]] = {} self._active_tasks: Set[asyncio.Task] = set() - # Locks - Establish consistent ordering - self._global_lock = asyncio.Lock() # Primary lock for coordinating all operations - self._queue_lock = asyncio.Lock() # Secondary lock for queue operations - self._processing_lock = asyncio.Lock() # Tertiary lock for processing operations + # Locks + self._global_lock = asyncio.Lock() + self._queue_lock = asyncio.Lock() + self._processing_lock = asyncio.Lock() # State self._shutdown = False - self._initialized = asyncio.Event() + self._init_event = asyncio.Event() # Single event for initialization state self.metrics = QueueMetrics() # Components @@ -70,10 +85,19 @@ class EnhancedVideoQueueManager: max_history_age=max_history_age ) + # Mark instance as initialized + self._initialized = True + async def initialize(self) -> None: """Initialize the queue manager components sequentially""" - try: - async with self._global_lock: + # Use class-level lock to prevent multiple initializations + async with self._instance_lock: + # Check if already initialized + if self._init_event.is_set(): + logger.info("Queue manager already initialized") + return + + try: logger.info("Starting queue manager initialization...") # Load persisted state first if available @@ -112,13 +136,13 @@ class EnhancedVideoQueueManager: logger.info("Queue cleanup started") # Signal initialization complete - self._initialized.set() + self._init_event.set() logger.info("Queue manager initialization completed") - except Exception as e: - logger.error(f"Failed to initialize queue manager: {e}") - self._shutdown = True - raise + except Exception as e: + logger.error(f"Failed to initialize queue manager: {e}") + self._shutdown = True + raise async def _load_persisted_state(self) -> None: """Load persisted queue state""" @@ -153,7 +177,7 @@ class EnhancedVideoQueueManager: ) -> None: """Process items in the queue""" # Wait for initialization to complete - await self._initialized.wait() + await self._init_event.wait() logger.info("Queue processor started") last_persist_time = time.time() @@ -283,7 +307,8 @@ class EnhancedVideoQueueManager: if self._shutdown: raise QueueError("Queue manager is shutting down") - await self._initialized.wait() + # Wait for initialization using the correct event + await self._init_event.wait() try: async with self._global_lock: