From 53e7769811a60d09ed3f28d637b25f6bf694f9aa Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Fri, 15 Nov 2024 23:04:13 +0000 Subject: [PATCH] Added proper initialization sequence in the queue manager: Sequential component initialization Coordination through an _initialized event Consistent lock hierarchy with _global_lock Updated the base cog to properly initialize components: Added timeout handling for queue manager initialization Ensures queue manager is fully initialized before starting queue processing Added proper error handling and cleanup Established consistent lock ordering: Global Lock -> Queue Lock -> Processing Lock Prevents circular wait conditions Each component respects the lock hierarchy Improved task coordination: Sequential initialization of components Proper waiting for initialization before starting operations Timeout handling for all async operations --- videoarchiver/core/base.py | 16 +- videoarchiver/queue/manager.py | 385 ++++++++++++++++----------------- 2 files changed, 200 insertions(+), 201 deletions(-) diff --git a/videoarchiver/core/base.py b/videoarchiver/core/base.py index e392533..0a0c1eb 100644 --- a/videoarchiver/core/base.py +++ b/videoarchiver/core/base.py @@ -386,7 +386,20 @@ class VideoArchiver(GroupCog): max_history_age=86400, persistence_path=str(queue_path), ) - logger.info("Queue manager initialized") + + # Initialize queue manager with timeout + try: + await asyncio.wait_for( + self.queue_manager.initialize(), + timeout=INIT_TIMEOUT + ) + logger.info("Queue manager initialized successfully") + except asyncio.TimeoutError: + logger.error("Queue manager initialization timed out") + raise ProcessingError("Queue manager initialization timed out") + except Exception as e: + logger.error(f"Queue manager initialization failed: {e}") + raise # Initialize processor with queue manager and shared FFmpeg manager self.processor = VideoProcessor( @@ -429,6 +442,7 @@ class VideoArchiver(GroupCog): logger.warning("Update checker start timed out") # Start queue processing as a background task + # Only start after queue manager is fully initialized self._queue_task = asyncio.create_task( self.queue_manager.process_queue(self.processor.process_video) ) diff --git a/videoarchiver/queue/manager.py b/videoarchiver/queue/manager.py index 358016b..7b9eb5b 100644 --- a/videoarchiver/queue/manager.py +++ b/videoarchiver/queue/manager.py @@ -48,12 +48,14 @@ class EnhancedVideoQueueManager: self._channel_queues: Dict[int, Set[str]] = {} self._active_tasks: Set[asyncio.Task] = set() - # Locks - self._queue_lock = asyncio.Lock() - self._processing_lock = asyncio.Lock() + # Locks - Establish consistent ordering + self._global_lock = asyncio.Lock() # Primary lock for coordinating all operations + self._queue_lock = asyncio.Lock() # Secondary lock for queue operations + self._processing_lock = asyncio.Lock() # Tertiary lock for processing operations # State self._shutdown = False + self._initialized = asyncio.Event() self.metrics = QueueMetrics() # Components @@ -68,52 +70,68 @@ class EnhancedVideoQueueManager: max_history_age=max_history_age ) - # Initialize tasks - self._init_tasks() + async def initialize(self) -> None: + """Initialize the queue manager components sequentially""" + try: + async with self._global_lock: + logger.info("Starting queue manager initialization...") + + # Load persisted state first if available + if self.persistence: + await self._load_persisted_state() + + # Start monitoring task + monitor_task = asyncio.create_task( + self.monitor.start_monitoring( + self._queue, + self._processing, + self.metrics, + self._processing_lock + ) + ) + self._active_tasks.add(monitor_task) + logger.info("Queue monitoring started") - def _init_tasks(self) -> None: - """Initialize background tasks""" - # Start monitoring - monitor_task = asyncio.create_task( - self.monitor.start_monitoring( - self._queue, - self._processing, - self.metrics, - self._processing_lock - ) - ) - self._active_tasks.add(monitor_task) - logger.info("Queue monitoring started") + # Brief pause to allow monitor to initialize + await asyncio.sleep(0.1) + + # Start cleanup task + cleanup_task = asyncio.create_task( + self.cleaner.start_cleanup( + self._queue, + self._completed, + self._failed, + self._guild_queues, + self._channel_queues, + self._processing, + self.metrics, + self._queue_lock + ) + ) + self._active_tasks.add(cleanup_task) + logger.info("Queue cleanup started") - # Start cleanup - cleanup_task = asyncio.create_task( - self.cleaner.start_cleanup( - self._queue, - self._completed, - self._failed, - self._guild_queues, - self._channel_queues, - self._processing, - self.metrics, - self._queue_lock - ) - ) - self._active_tasks.add(cleanup_task) - logger.info("Queue cleanup started") + # Signal initialization complete + self._initialized.set() + logger.info("Queue manager initialization completed") - # Load persisted state if available - if self.persistence: - self._load_persisted_state() + except Exception as e: + logger.error(f"Failed to initialize queue manager: {e}") + self._shutdown = True + raise - def _load_persisted_state(self) -> None: + async def _load_persisted_state(self) -> None: """Load persisted queue state""" try: state = self.persistence.load_queue_state() if state: - self._queue = state["queue"] - self._processing = state["processing"] - self._completed = state["completed"] - self._failed = state["failed"] + async with self._queue_lock: + self._queue = state["queue"] + self._completed = state["completed"] + self._failed = state["failed"] + + async with self._processing_lock: + self._processing = state["processing"] # Update metrics metrics_data = state.get("metrics", {}) @@ -133,28 +151,32 @@ class EnhancedVideoQueueManager: self, processor: Callable[[QueueItem], Tuple[bool, Optional[str]]] ) -> None: - """Process items in the queue + """Process items in the queue""" + # Wait for initialization to complete + await self._initialized.wait() - Args: - processor: Function that processes queue items - """ logger.info("Queue processor started") last_persist_time = time.time() - persist_interval = 60 # Persist state every 60 seconds instead of every operation + persist_interval = 60 # Persist state every 60 seconds while not self._shutdown: try: - # Process items in batches to avoid blocking - async with self._queue_lock: - # Get up to 5 items from queue - items = [] - while len(items) < 5 and self._queue: - item = self._queue.pop(0) - self._processing[item.url] = item - items.append(item) + items = [] + # Use global lock for coordination + async with self._global_lock: + # Then acquire specific locks in order + async with self._queue_lock: + # Get up to 5 items from queue + while len(items) < 5 and self._queue: + item = self._queue.pop(0) + items.append(item) + + if items: + async with self._processing_lock: + for item in items: + self._processing[item.url] = item if not items: - # Use shorter sleep when queue is empty and yield control await asyncio.sleep(0.1) continue @@ -164,30 +186,20 @@ class EnhancedVideoQueueManager: task = asyncio.create_task(self._process_item(processor, item)) tasks.append(task) - # Wait for all tasks to complete await asyncio.gather(*tasks, return_exceptions=True) # Persist state if interval has passed current_time = time.time() if self.persistence and (current_time - last_persist_time) >= persist_interval: - await self.persistence.persist_queue_state( - self._queue, - self._processing, - self._completed, - self._failed, - self.metrics - ) + await self._persist_state() last_persist_time = current_time except Exception as e: logger.error(f"Critical error in queue processor: {e}") - await asyncio.sleep(0.1) # Brief pause on error before retrying + await asyncio.sleep(0.1) - # Yield control after each batch await asyncio.sleep(0) - logger.info("Queue processor stopped") - async def _process_item( self, processor: Callable[[QueueItem], Tuple[bool, Optional[str]]], @@ -195,52 +207,68 @@ class EnhancedVideoQueueManager: ) -> None: """Process a single queue item""" try: - # Process the item logger.info(f"Processing queue item: {item.url}") - item.start_processing() # Start processing tracking - self.metrics.last_activity_time = time.time() # Update activity time + item.start_processing() + self.metrics.last_activity_time = time.time() success, error = await processor(item) - # Update metrics and status - async with self._processing_lock: - item.finish_processing(success, error) # Update item status - - if success: - self._completed[item.url] = item - logger.info(f"Successfully processed: {item.url}") - else: - if item.retry_count < self.max_retries: - item.retry_count += 1 - item.status = "pending" - item.last_retry = datetime.utcnow() - item.priority = max(0, item.priority - 1) - self._queue.append(item) - logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})") + async with self._global_lock: + async with self._processing_lock: + item.finish_processing(success, error) + self._processing.pop(item.url, None) + + if success: + self._completed[item.url] = item + logger.info(f"Successfully processed: {item.url}") else: - self._failed[item.url] = item - logger.error(f"Failed after {self.max_retries} attempts: {item.url}") - - self._processing.pop(item.url, None) - - # Update metrics - self.metrics.update( - processing_time=item.processing_time, - success=success, - error=error - ) + async with self._queue_lock: + if item.retry_count < self.max_retries: + item.retry_count += 1 + item.status = "pending" + item.last_retry = datetime.utcnow() + item.priority = max(0, item.priority - 1) + self._queue.append(item) + logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})") + else: + self._failed[item.url] = item + logger.error(f"Failed after {self.max_retries} attempts: {item.url}") + + self.metrics.update( + processing_time=item.processing_time, + success=success, + error=error + ) except Exception as e: logger.error(f"Error processing {item.url}: {e}") - async with self._processing_lock: - item.finish_processing(False, str(e)) - self._failed[item.url] = item - self._processing.pop(item.url, None) - self.metrics.update( - processing_time=item.processing_time, - success=False, - error=str(e) + async with self._global_lock: + async with self._processing_lock: + item.finish_processing(False, str(e)) + self._processing.pop(item.url, None) + self._failed[item.url] = item + self.metrics.update( + processing_time=item.processing_time, + success=False, + error=str(e) + ) + + async def _persist_state(self) -> None: + """Persist current state to storage""" + if not self.persistence: + return + + try: + async with self._global_lock: + await self.persistence.persist_queue_state( + self._queue, + self._processing, + self._completed, + self._failed, + self.metrics ) + except Exception as e: + logger.error(f"Failed to persist state: {e}") async def add_to_queue( self, @@ -255,48 +283,42 @@ class EnhancedVideoQueueManager: if self._shutdown: raise QueueError("Queue manager is shutting down") + await self._initialized.wait() + try: - async with self._queue_lock: - if len(self._queue) >= self.max_queue_size: - raise QueueError("Queue is full") + async with self._global_lock: + async with self._queue_lock: + if len(self._queue) >= self.max_queue_size: + raise QueueError("Queue is full") - item = QueueItem( - url=url, - message_id=message_id, - channel_id=channel_id, - guild_id=guild_id, - author_id=author_id, - added_at=datetime.utcnow(), - priority=priority, - ) - - # Add to tracking - if guild_id not in self._guild_queues: - self._guild_queues[guild_id] = set() - self._guild_queues[guild_id].add(url) - - if channel_id not in self._channel_queues: - self._channel_queues[channel_id] = set() - self._channel_queues[channel_id].add(url) - - # Add to queue with priority - self._queue.append(item) - self._queue.sort(key=lambda x: (-x.priority, x.added_at)) - - # Update activity time - self.metrics.last_activity_time = time.time() - - if self.persistence: - await self.persistence.persist_queue_state( - self._queue, - self._processing, - self._completed, - self._failed, - self.metrics + item = QueueItem( + url=url, + message_id=message_id, + channel_id=channel_id, + guild_id=guild_id, + author_id=author_id, + added_at=datetime.utcnow(), + priority=priority, ) - logger.info(f"Added to queue: {url} (priority: {priority})") - return True + if guild_id not in self._guild_queues: + self._guild_queues[guild_id] = set() + self._guild_queues[guild_id].add(url) + + if channel_id not in self._channel_queues: + self._channel_queues[channel_id] = set() + self._channel_queues[channel_id].add(url) + + self._queue.append(item) + self._queue.sort(key=lambda x: (-x.priority, x.added_at)) + + self.metrics.last_activity_time = time.time() + + if self.persistence: + await self._persist_state() + + logger.info(f"Added to queue: {url} (priority: {priority})") + return True except Exception as e: logger.error(f"Error adding to queue: {e}") @@ -350,38 +372,6 @@ class EnhancedVideoQueueManager: }, } - async def clear_guild_queue(self, guild_id: int) -> int: - """Clear all queue items for a guild""" - if self._shutdown: - raise QueueError("Queue manager is shutting down") - - try: - cleared = await self.cleaner.clear_guild_queue( - guild_id, - self._queue, - self._processing, - self._completed, - self._failed, - self._guild_queues, - self._channel_queues, - self._queue_lock - ) - - if self.persistence: - await self.persistence.persist_queue_state( - self._queue, - self._processing, - self._completed, - self._failed, - self.metrics - ) - - return cleared - - except Exception as e: - logger.error(f"Error clearing guild queue: {e}") - raise QueueError(f"Failed to clear guild queue: {str(e)}") - async def cleanup(self) -> None: """Clean up resources and stop queue processing""" try: @@ -399,35 +389,30 @@ class EnhancedVideoQueueManager: await asyncio.gather(*self._active_tasks, return_exceptions=True) - # Move processing items back to queue - async with self._queue_lock: - for url, item in self._processing.items(): - if item.retry_count < self.max_retries: - item.status = "pending" - item.retry_count += 1 - self._queue.append(item) - else: - self._failed[url] = item + async with self._global_lock: + # Move processing items back to queue + async with self._processing_lock: + for url, item in self._processing.items(): + if item.retry_count < self.max_retries: + item.status = "pending" + item.retry_count += 1 + self._queue.append(item) + else: + self._failed[url] = item - self._processing.clear() + self._processing.clear() - # Final state persistence - if self.persistence: - await self.persistence.persist_queue_state( - self._queue, - self._processing, - self._completed, - self._failed, - self.metrics - ) + # Final state persistence + if self.persistence: + await self._persist_state() - # Clear collections - self._queue.clear() - self._completed.clear() - self._failed.clear() - self._guild_queues.clear() - self._channel_queues.clear() - self._active_tasks.clear() + # Clear collections + self._queue.clear() + self._completed.clear() + self._failed.clear() + self._guild_queues.clear() + self._channel_queues.clear() + self._active_tasks.clear() logger.info("Queue manager cleanup completed")