diff --git a/videoarchiver/queue/manager.py b/videoarchiver/queue/manager.py index 18040dc..358016b 100644 --- a/videoarchiver/queue/manager.py +++ b/videoarchiver/queue/manager.py @@ -144,67 +144,30 @@ class EnhancedVideoQueueManager: while not self._shutdown: try: - # Get next item from queue - item = None + # Process items in batches to avoid blocking async with self._queue_lock: - if self._queue: + # Get up to 5 items from queue + items = [] + while len(items) < 5 and self._queue: item = self._queue.pop(0) self._processing[item.url] = item + items.append(item) - if not item: - # Use shorter sleep when queue is empty + if not items: + # Use shorter sleep when queue is empty and yield control await asyncio.sleep(0.1) continue - try: - # Process the item - logger.info(f"Processing queue item: {item.url}") - item.start_processing() # Start processing tracking - self.metrics.last_activity_time = time.time() # Update activity time - - success, error = await processor(item) - - # Update metrics and status - async with self._processing_lock: - item.finish_processing(success, error) # Update item status - - if success: - self._completed[item.url] = item - logger.info(f"Successfully processed: {item.url}") - else: - if item.retry_count < self.max_retries: - item.retry_count += 1 - item.status = "pending" - item.last_retry = datetime.utcnow() - item.priority = max(0, item.priority - 1) - self._queue.append(item) - logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})") - else: - self._failed[item.url] = item - logger.error(f"Failed after {self.max_retries} attempts: {item.url}") + # Process items concurrently + tasks = [] + for item in items: + task = asyncio.create_task(self._process_item(processor, item)) + tasks.append(task) + + # Wait for all tasks to complete + await asyncio.gather(*tasks, return_exceptions=True) - self._processing.pop(item.url, None) - - # Update metrics - self.metrics.update( - processing_time=item.processing_time, - success=success, - error=error - ) - - except Exception as e: - logger.error(f"Error processing {item.url}: {e}") - async with self._processing_lock: - item.finish_processing(False, str(e)) - self._failed[item.url] = item - self._processing.pop(item.url, None) - self.metrics.update( - processing_time=item.processing_time, - success=False, - error=str(e) - ) - - # Persist state if enabled and interval has passed + # Persist state if interval has passed current_time = time.time() if self.persistence and (current_time - last_persist_time) >= persist_interval: await self.persistence.persist_queue_state( @@ -220,11 +183,65 @@ class EnhancedVideoQueueManager: logger.error(f"Critical error in queue processor: {e}") await asyncio.sleep(0.1) # Brief pause on error before retrying - # Allow other tasks to run between iterations + # Yield control after each batch await asyncio.sleep(0) logger.info("Queue processor stopped") + async def _process_item( + self, + processor: Callable[[QueueItem], Tuple[bool, Optional[str]]], + item: QueueItem + ) -> None: + """Process a single queue item""" + try: + # Process the item + logger.info(f"Processing queue item: {item.url}") + item.start_processing() # Start processing tracking + self.metrics.last_activity_time = time.time() # Update activity time + + success, error = await processor(item) + + # Update metrics and status + async with self._processing_lock: + item.finish_processing(success, error) # Update item status + + if success: + self._completed[item.url] = item + logger.info(f"Successfully processed: {item.url}") + else: + if item.retry_count < self.max_retries: + item.retry_count += 1 + item.status = "pending" + item.last_retry = datetime.utcnow() + item.priority = max(0, item.priority - 1) + self._queue.append(item) + logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})") + else: + self._failed[item.url] = item + logger.error(f"Failed after {self.max_retries} attempts: {item.url}") + + self._processing.pop(item.url, None) + + # Update metrics + self.metrics.update( + processing_time=item.processing_time, + success=success, + error=error + ) + + except Exception as e: + logger.error(f"Error processing {item.url}: {e}") + async with self._processing_lock: + item.finish_processing(False, str(e)) + self._failed[item.url] = item + self._processing.pop(item.url, None) + self.metrics.update( + processing_time=item.processing_time, + success=False, + error=str(e) + ) + async def add_to_queue( self, url: str,