diff --git a/videoarchiver/core/base.py b/videoarchiver/core/base.py index 7b43a4c..e392533 100644 --- a/videoarchiver/core/base.py +++ b/videoarchiver/core/base.py @@ -428,9 +428,11 @@ class VideoArchiver(GroupCog): except asyncio.TimeoutError: logger.warning("Update checker start timed out") - # Start queue processing - await self.queue_manager.process_queue(self.processor.process_video) - logger.info("Queue processing started") + # Start queue processing as a background task + self._queue_task = asyncio.create_task( + self.queue_manager.process_queue(self.processor.process_video) + ) + logger.info("Queue processing task created") # Set ready flag self.ready.set() @@ -462,7 +464,7 @@ class VideoArchiver(GroupCog): init_task.cancel() await force_cleanup_resources(self) raise ProcessingError("Cog initialization timed out") - + # Wait for ready flag with timeout try: await asyncio.wait_for(self.ready.wait(), timeout=INIT_TIMEOUT) @@ -470,7 +472,7 @@ class VideoArchiver(GroupCog): except asyncio.TimeoutError: await force_cleanup_resources(self) raise ProcessingError("Ready flag wait timed out") - + except Exception as e: # Ensure cleanup on any error try: @@ -489,10 +491,20 @@ class VideoArchiver(GroupCog): # Cancel any pending tasks if self._init_task and not self._init_task.done(): self._init_task.cancel() - + if self._cleanup_task and not self._cleanup_task.done(): self._cleanup_task.cancel() + # Cancel queue processing task if it exists + if hasattr(self, '_queue_task') and self._queue_task and not self._queue_task.done(): + self._queue_task.cancel() + try: + await self._queue_task + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error cancelling queue task: {e}") + # Try normal cleanup first cleanup_task = asyncio.create_task(cleanup_resources(self)) try: @@ -503,7 +515,7 @@ class VideoArchiver(GroupCog): logger.warning("Normal cleanup timed out, forcing cleanup") else: logger.error(f"Error during normal cleanup: {str(e)}") - + # Cancel normal cleanup and force cleanup cleanup_task.cancel() try: @@ -534,21 +546,19 @@ class VideoArchiver(GroupCog): self.db = None self._init_task = None self._cleanup_task = None + if hasattr(self, '_queue_task'): + self._queue_task = None async def _cleanup(self) -> None: """Clean up all resources with proper handling""" try: - await asyncio.wait_for( - cleanup_resources(self), - timeout=CLEANUP_TIMEOUT - ) + await asyncio.wait_for(cleanup_resources(self), timeout=CLEANUP_TIMEOUT) logger.info("Cleanup completed successfully") except asyncio.TimeoutError: logger.warning("Cleanup timed out, forcing cleanup") try: await asyncio.wait_for( - force_cleanup_resources(self), - timeout=CLEANUP_TIMEOUT + force_cleanup_resources(self), timeout=CLEANUP_TIMEOUT ) logger.info("Force cleanup completed") except asyncio.TimeoutError: diff --git a/videoarchiver/queue/manager.py b/videoarchiver/queue/manager.py index f3bb7ad..18040dc 100644 --- a/videoarchiver/queue/manager.py +++ b/videoarchiver/queue/manager.py @@ -139,6 +139,9 @@ class EnhancedVideoQueueManager: processor: Function that processes queue items """ logger.info("Queue processor started") + last_persist_time = time.time() + persist_interval = 60 # Persist state every 60 seconds instead of every operation + while not self._shutdown: try: # Get next item from queue @@ -149,7 +152,8 @@ class EnhancedVideoQueueManager: self._processing[item.url] = item if not item: - await asyncio.sleep(1) + # Use shorter sleep when queue is empty + await asyncio.sleep(0.1) continue try: @@ -200,8 +204,9 @@ class EnhancedVideoQueueManager: error=str(e) ) - # Persist state if enabled - if self.persistence: + # Persist state if enabled and interval has passed + current_time = time.time() + if self.persistence and (current_time - last_persist_time) >= persist_interval: await self.persistence.persist_queue_state( self._queue, self._processing, @@ -209,12 +214,14 @@ class EnhancedVideoQueueManager: self._failed, self.metrics ) + last_persist_time = current_time except Exception as e: logger.error(f"Critical error in queue processor: {e}") - await asyncio.sleep(1) + await asyncio.sleep(0.1) # Brief pause on error before retrying - await asyncio.sleep(0.1) + # Allow other tasks to run between iterations + await asyncio.sleep(0) logger.info("Queue processor stopped")