From 2160be84bb58a77c0b30785c7efa6ba21409a446 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Thu, 14 Nov 2024 23:57:24 +0000 Subject: [PATCH] fix imports --- videoarchiver/enhanced_queue.py | 154 +++++++++++++++++++++++++------- 1 file changed, 122 insertions(+), 32 deletions(-) diff --git a/videoarchiver/enhanced_queue.py b/videoarchiver/enhanced_queue.py index 9176576..43dacf2 100644 --- a/videoarchiver/enhanced_queue.py +++ b/videoarchiver/enhanced_queue.py @@ -34,6 +34,50 @@ logging.basicConfig( ) logger = logging.getLogger('EnhancedQueueManager') +@dataclass +class QueueMetrics: + """Metrics tracking for queue performance and health""" + total_processed: int = 0 + total_failed: int = 0 + avg_processing_time: float = 0.0 + success_rate: float = 0.0 + peak_memory_usage: float = 0.0 + errors_by_type: Dict[str, int] = field(default_factory=dict) + last_error: Optional[str] = None + last_error_time: Optional[datetime] = None + last_cleanup: datetime = field(default_factory=datetime.utcnow) + retries: int = 0 + processing_times: List[float] = field(default_factory=list) + + def update(self, processing_time: float, success: bool, error: str = None): + """Update metrics with new processing information""" + self.total_processed += 1 + if not success: + self.total_failed += 1 + if error: + self.last_error = error + self.last_error_time = datetime.utcnow() + error_type = error.split(':')[0] if ':' in error else error + self.errors_by_type[error_type] = self.errors_by_type.get(error_type, 0) + 1 + + # Update processing times with sliding window + self.processing_times.append(processing_time) + if len(self.processing_times) > 100: # Keep last 100 processing times + self.processing_times.pop(0) + + # Update average processing time + self.avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0.0 + + # Update success rate + self.success_rate = ( + (self.total_processed - self.total_failed) / self.total_processed + if self.total_processed > 0 else 0.0 + ) + + # Update peak memory usage + current_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB + self.peak_memory_usage = max(self.peak_memory_usage, current_memory) + @dataclass class QueueItem: """Represents a video processing task in the queue""" @@ -52,38 +96,8 @@ class QueueItem: last_error: Optional[str] = None retry_count: int = 0 last_retry: Optional[datetime] = None - self.processing_times: List[float] = [] - self.last_error: Optional[str] = None - self.last_error_time: Optional[datetime] = None - - def update_metrics(self, processing_time: float, success: bool, error: str = None): - """Update metrics with new processing information""" - self.total_processed += 1 - if not success: - self.total_failed += 1 - if error: - self.last_error = error - self.last_error_time = datetime.utcnow() - error_type = error.split(':')[0] if ':' in error else error - self.errors_by_type[error_type] = self.errors_by_type.get(error_type, 0) + 1 - - # Update processing times with sliding window - self.processing_times.append(processing_time) - if len(self.processing_times) > 100: # Keep last 100 processing times - self.processing_times.pop(0) - - # Update average processing time - self.avg_processing_time = sum(self.processing_times) / len(self.processing_times) - - # Update success rate - self.success_rate = ( - (self.total_processed - self.total_failed) / self.total_processed - if self.total_processed > 0 else 0.0 - ) - - # Update peak memory usage - current_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB - self.peak_memory_usage = max(self.peak_memory_usage, current_memory) + processing_times: List[float] = field(default_factory=list) + last_error_time: Optional[datetime] = None class EnhancedVideoQueueManager: """Enhanced queue manager with improved memory management and performance""" @@ -204,6 +218,82 @@ class EnhancedVideoQueueManager: logger.error(f"Error adding video to queue: {traceback.format_exc()}") raise QueueError(f"Failed to add to queue: {str(e)}") + async def process_queue(self, processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]): + """Process items in the queue with the provided processor function + + Args: + processor: A callable that takes a QueueItem and returns a tuple of (success: bool, error: Optional[str]) + """ + while True: + try: + # Get next item from queue + item = None + async with self._queue_lock: + if self._queue: + item = self._queue.pop(0) + self._processing[item.url] = item + item.status = "processing" + item.processing_time = time.time() + + if not item: + await asyncio.sleep(1) + continue + + try: + # Process the item + start_time = time.time() + success, error = await processor(item) + processing_time = time.time() - start_time + + # Update metrics + self.metrics.update(processing_time, success, error) + + # Update item status + async with self._processing_lock: + if success: + item.status = "completed" + self._completed[item.url] = item + logger.info(f"Successfully processed item: {item.url}") + else: + item.status = "failed" + item.error = error + item.last_error = error + item.last_error_time = datetime.utcnow() + + # Handle retries + if item.retry_count < self.max_retries: + item.retry_count += 1 + item.status = "pending" + item.last_retry = datetime.utcnow() + self._queue.append(item) + logger.warning(f"Retrying item: {item.url} (attempt {item.retry_count})") + else: + self._failed[item.url] = item + logger.error(f"Failed to process item after {self.max_retries} attempts: {item.url}") + + self._processing.pop(item.url, None) + + except Exception as e: + logger.error(f"Error processing item {item.url}: {traceback.format_exc()}") + async with self._processing_lock: + item.status = "failed" + item.error = str(e) + item.last_error = str(e) + item.last_error_time = datetime.utcnow() + self._failed[item.url] = item + self._processing.pop(item.url, None) + + # Persist state after processing + if self.persistence_path: + await self._persist_queue() + + except Exception as e: + logger.error(f"Error in queue processor: {traceback.format_exc()}") + await asyncio.sleep(1) + + # Small delay to prevent CPU overload + await asyncio.sleep(0.1) + async def _periodic_backup(self): """Periodically backup queue state""" while True: