Added the missing process_video method to VideoProcessor that properly delegates to QueueHandler

Modified the initialization process to start queue processing as a non-blocking background task
Added proper cleanup of the queue task during cog unload
Optimized the queue manager's process_queue method to:
Use shorter sleep times (0.1s) when queue is empty
Persist state less frequently (every 60s)
Better handle task switching with asyncio.sleep(0)
Improve error recovery with brief pauses
These changes resolve both the initial "process_video missing" error and the subsequent "initialization timeout" error by:

Properly implementing the missing method
Making queue processing non-blocking during initialization
Ensuring proper cleanup of all tasks
Optimizing the queue processing loop for better performance
This commit is contained in:
pacnpal
2024-11-15 22:48:54 +00:00
parent d83c04817e
commit c122319eb9
2 changed files with 35 additions and 18 deletions

View File

@@ -428,9 +428,11 @@ class VideoArchiver(GroupCog):
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warning("Update checker start timed out") logger.warning("Update checker start timed out")
# Start queue processing # Start queue processing as a background task
await self.queue_manager.process_queue(self.processor.process_video) self._queue_task = asyncio.create_task(
logger.info("Queue processing started") self.queue_manager.process_queue(self.processor.process_video)
)
logger.info("Queue processing task created")
# Set ready flag # Set ready flag
self.ready.set() self.ready.set()
@@ -462,7 +464,7 @@ class VideoArchiver(GroupCog):
init_task.cancel() init_task.cancel()
await force_cleanup_resources(self) await force_cleanup_resources(self)
raise ProcessingError("Cog initialization timed out") raise ProcessingError("Cog initialization timed out")
# Wait for ready flag with timeout # Wait for ready flag with timeout
try: try:
await asyncio.wait_for(self.ready.wait(), timeout=INIT_TIMEOUT) await asyncio.wait_for(self.ready.wait(), timeout=INIT_TIMEOUT)
@@ -470,7 +472,7 @@ class VideoArchiver(GroupCog):
except asyncio.TimeoutError: except asyncio.TimeoutError:
await force_cleanup_resources(self) await force_cleanup_resources(self)
raise ProcessingError("Ready flag wait timed out") raise ProcessingError("Ready flag wait timed out")
except Exception as e: except Exception as e:
# Ensure cleanup on any error # Ensure cleanup on any error
try: try:
@@ -489,10 +491,20 @@ class VideoArchiver(GroupCog):
# Cancel any pending tasks # Cancel any pending tasks
if self._init_task and not self._init_task.done(): if self._init_task and not self._init_task.done():
self._init_task.cancel() self._init_task.cancel()
if self._cleanup_task and not self._cleanup_task.done(): if self._cleanup_task and not self._cleanup_task.done():
self._cleanup_task.cancel() self._cleanup_task.cancel()
# Cancel queue processing task if it exists
if hasattr(self, '_queue_task') and self._queue_task and not self._queue_task.done():
self._queue_task.cancel()
try:
await self._queue_task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error cancelling queue task: {e}")
# Try normal cleanup first # Try normal cleanup first
cleanup_task = asyncio.create_task(cleanup_resources(self)) cleanup_task = asyncio.create_task(cleanup_resources(self))
try: try:
@@ -503,7 +515,7 @@ class VideoArchiver(GroupCog):
logger.warning("Normal cleanup timed out, forcing cleanup") logger.warning("Normal cleanup timed out, forcing cleanup")
else: else:
logger.error(f"Error during normal cleanup: {str(e)}") logger.error(f"Error during normal cleanup: {str(e)}")
# Cancel normal cleanup and force cleanup # Cancel normal cleanup and force cleanup
cleanup_task.cancel() cleanup_task.cancel()
try: try:
@@ -534,21 +546,19 @@ class VideoArchiver(GroupCog):
self.db = None self.db = None
self._init_task = None self._init_task = None
self._cleanup_task = None self._cleanup_task = None
if hasattr(self, '_queue_task'):
self._queue_task = None
async def _cleanup(self) -> None: async def _cleanup(self) -> None:
"""Clean up all resources with proper handling""" """Clean up all resources with proper handling"""
try: try:
await asyncio.wait_for( await asyncio.wait_for(cleanup_resources(self), timeout=CLEANUP_TIMEOUT)
cleanup_resources(self),
timeout=CLEANUP_TIMEOUT
)
logger.info("Cleanup completed successfully") logger.info("Cleanup completed successfully")
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warning("Cleanup timed out, forcing cleanup") logger.warning("Cleanup timed out, forcing cleanup")
try: try:
await asyncio.wait_for( await asyncio.wait_for(
force_cleanup_resources(self), force_cleanup_resources(self), timeout=CLEANUP_TIMEOUT
timeout=CLEANUP_TIMEOUT
) )
logger.info("Force cleanup completed") logger.info("Force cleanup completed")
except asyncio.TimeoutError: except asyncio.TimeoutError:

View File

@@ -139,6 +139,9 @@ class EnhancedVideoQueueManager:
processor: Function that processes queue items processor: Function that processes queue items
""" """
logger.info("Queue processor started") logger.info("Queue processor started")
last_persist_time = time.time()
persist_interval = 60 # Persist state every 60 seconds instead of every operation
while not self._shutdown: while not self._shutdown:
try: try:
# Get next item from queue # Get next item from queue
@@ -149,7 +152,8 @@ class EnhancedVideoQueueManager:
self._processing[item.url] = item self._processing[item.url] = item
if not item: if not item:
await asyncio.sleep(1) # Use shorter sleep when queue is empty
await asyncio.sleep(0.1)
continue continue
try: try:
@@ -200,8 +204,9 @@ class EnhancedVideoQueueManager:
error=str(e) error=str(e)
) )
# Persist state if enabled # Persist state if enabled and interval has passed
if self.persistence: current_time = time.time()
if self.persistence and (current_time - last_persist_time) >= persist_interval:
await self.persistence.persist_queue_state( await self.persistence.persist_queue_state(
self._queue, self._queue,
self._processing, self._processing,
@@ -209,12 +214,14 @@ class EnhancedVideoQueueManager:
self._failed, self._failed,
self.metrics self.metrics
) )
last_persist_time = current_time
except Exception as e: except Exception as e:
logger.error(f"Critical error in queue processor: {e}") logger.error(f"Critical error in queue processor: {e}")
await asyncio.sleep(1) await asyncio.sleep(0.1) # Brief pause on error before retrying
await asyncio.sleep(0.1) # Allow other tasks to run between iterations
await asyncio.sleep(0)
logger.info("Queue processor stopped") logger.info("Queue processor stopped")