From c122319eb99eb108e987810be95b2ea8504bb440 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Fri, 15 Nov 2024 22:48:54 +0000 Subject: [PATCH] Added the missing process_video method to VideoProcessor that properly delegates to QueueHandler Modified the initialization process to start queue processing as a non-blocking background task Added proper cleanup of the queue task during cog unload Optimized the queue manager's process_queue method to: Use shorter sleep times (0.1s) when queue is empty Persist state less frequently (every 60s) Better handle task switching with asyncio.sleep(0) Improve error recovery with brief pauses These changes resolve both the initial "process_video missing" error and the subsequent "initialization timeout" error by: Properly implementing the missing method Making queue processing non-blocking during initialization Ensuring proper cleanup of all tasks Optimizing the queue processing loop for better performance --- videoarchiver/core/base.py | 36 ++++++++++++++++++++++------------ videoarchiver/queue/manager.py | 17 +++++++++++----- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/videoarchiver/core/base.py b/videoarchiver/core/base.py index 7b43a4c..e392533 100644 --- a/videoarchiver/core/base.py +++ b/videoarchiver/core/base.py @@ -428,9 +428,11 @@ class VideoArchiver(GroupCog): except asyncio.TimeoutError: logger.warning("Update checker start timed out") - # Start queue processing - await self.queue_manager.process_queue(self.processor.process_video) - logger.info("Queue processing started") + # Start queue processing as a background task + self._queue_task = asyncio.create_task( + self.queue_manager.process_queue(self.processor.process_video) + ) + logger.info("Queue processing task created") # Set ready flag self.ready.set() @@ -462,7 +464,7 @@ class VideoArchiver(GroupCog): init_task.cancel() await force_cleanup_resources(self) raise ProcessingError("Cog initialization timed out") - + # Wait for ready flag with timeout try: await asyncio.wait_for(self.ready.wait(), timeout=INIT_TIMEOUT) @@ -470,7 +472,7 @@ class VideoArchiver(GroupCog): except asyncio.TimeoutError: await force_cleanup_resources(self) raise ProcessingError("Ready flag wait timed out") - + except Exception as e: # Ensure cleanup on any error try: @@ -489,10 +491,20 @@ class VideoArchiver(GroupCog): # Cancel any pending tasks if self._init_task and not self._init_task.done(): self._init_task.cancel() - + if self._cleanup_task and not self._cleanup_task.done(): self._cleanup_task.cancel() + # Cancel queue processing task if it exists + if hasattr(self, '_queue_task') and self._queue_task and not self._queue_task.done(): + self._queue_task.cancel() + try: + await self._queue_task + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error cancelling queue task: {e}") + # Try normal cleanup first cleanup_task = asyncio.create_task(cleanup_resources(self)) try: @@ -503,7 +515,7 @@ class VideoArchiver(GroupCog): logger.warning("Normal cleanup timed out, forcing cleanup") else: logger.error(f"Error during normal cleanup: {str(e)}") - + # Cancel normal cleanup and force cleanup cleanup_task.cancel() try: @@ -534,21 +546,19 @@ class VideoArchiver(GroupCog): self.db = None self._init_task = None self._cleanup_task = None + if hasattr(self, '_queue_task'): + self._queue_task = None async def _cleanup(self) -> None: """Clean up all resources with proper handling""" try: - await asyncio.wait_for( - cleanup_resources(self), - timeout=CLEANUP_TIMEOUT - ) + await asyncio.wait_for(cleanup_resources(self), timeout=CLEANUP_TIMEOUT) logger.info("Cleanup completed successfully") except asyncio.TimeoutError: logger.warning("Cleanup timed out, forcing cleanup") try: await asyncio.wait_for( - force_cleanup_resources(self), - timeout=CLEANUP_TIMEOUT + force_cleanup_resources(self), timeout=CLEANUP_TIMEOUT ) logger.info("Force cleanup completed") except asyncio.TimeoutError: diff --git a/videoarchiver/queue/manager.py b/videoarchiver/queue/manager.py index f3bb7ad..18040dc 100644 --- a/videoarchiver/queue/manager.py +++ b/videoarchiver/queue/manager.py @@ -139,6 +139,9 @@ class EnhancedVideoQueueManager: processor: Function that processes queue items """ logger.info("Queue processor started") + last_persist_time = time.time() + persist_interval = 60 # Persist state every 60 seconds instead of every operation + while not self._shutdown: try: # Get next item from queue @@ -149,7 +152,8 @@ class EnhancedVideoQueueManager: self._processing[item.url] = item if not item: - await asyncio.sleep(1) + # Use shorter sleep when queue is empty + await asyncio.sleep(0.1) continue try: @@ -200,8 +204,9 @@ class EnhancedVideoQueueManager: error=str(e) ) - # Persist state if enabled - if self.persistence: + # Persist state if enabled and interval has passed + current_time = time.time() + if self.persistence and (current_time - last_persist_time) >= persist_interval: await self.persistence.persist_queue_state( self._queue, self._processing, @@ -209,12 +214,14 @@ class EnhancedVideoQueueManager: self._failed, self.metrics ) + last_persist_time = current_time except Exception as e: logger.error(f"Critical error in queue processor: {e}") - await asyncio.sleep(1) + await asyncio.sleep(0.1) # Brief pause on error before retrying - await asyncio.sleep(0.1) + # Allow other tasks to run between iterations + await asyncio.sleep(0) logger.info("Queue processor stopped")