diff --git a/videoarchiver/enhanced_queue.py b/videoarchiver/enhanced_queue.py index 43dacf2..f0aed56 100644 --- a/videoarchiver/enhanced_queue.py +++ b/videoarchiver/enhanced_queue.py @@ -4,17 +4,12 @@ import logging import json import os import time -import psutil from typing import Dict, Optional, Set, Tuple, Callable, Any, List, Union from datetime import datetime, timedelta import traceback from dataclasses import dataclass, asdict, field -import weakref from pathlib import Path -import aiofiles -import aiofiles.os import sys -import signal from concurrent.futures import ThreadPoolExecutor from functools import partial import tempfile @@ -41,6 +36,11 @@ class QueueMetrics: total_failed: int = 0 avg_processing_time: float = 0.0 success_rate: float = 0.0 + errors_by_type: Dict[str, int] = field(default_factory=dict) + last_error: Optional[str] = None + last_error_time: Optional[datetime] = None + last_cleanup: datetime = field(default_factory=datetime.utcnow) + retries: int = 0 peak_memory_usage: float = 0.0 errors_by_type: Dict[str, int] = field(default_factory=dict) last_error: Optional[str] = None @@ -57,30 +57,39 @@ class QueueMetrics: if error: self.last_error = error self.last_error_time = datetime.utcnow() - error_type = error.split(':')[0] if ':' in error else error - self.errors_by_type[error_type] = self.errors_by_type.get(error_type, 0) + 1 - + error_type = error.split(":")[0] if ":" in error else error + self.errors_by_type[error_type] = ( + self.errors_by_type.get(error_type, 0) + 1 + ) + # Update processing times with sliding window self.processing_times.append(processing_time) if len(self.processing_times) > 100: # Keep last 100 processing times self.processing_times.pop(0) - + # Update average processing time - self.avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0.0 - + self.avg_processing_time = ( + sum(self.processing_times) / len(self.processing_times) + if self.processing_times + else 0.0 + ) + # Update success rate self.success_rate = ( (self.total_processed - self.total_failed) / self.total_processed - if self.total_processed > 0 else 0.0 + if self.total_processed > 0 + else 0.0 ) - + # Update peak memory usage current_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB self.peak_memory_usage = max(self.peak_memory_usage, current_memory) + @dataclass class QueueItem: """Represents a video processing task in the queue""" + url: str message_id: int channel_id: int @@ -99,9 +108,10 @@ class QueueItem: processing_times: List[float] = field(default_factory=list) last_error_time: Optional[datetime] = None + class EnhancedVideoQueueManager: """Enhanced queue manager with improved memory management and performance""" - + def __init__( self, max_retries: int = 3, @@ -110,7 +120,7 @@ class EnhancedVideoQueueManager: cleanup_interval: int = 3600, # 1 hour max_history_age: int = 86400, # 24 hours persistence_path: Optional[str] = None, - backup_interval: int = 300 # 5 minutes + backup_interval: int = 300, # 5 minutes ): self.max_retries = max_retries self.retry_delay = retry_delay @@ -119,29 +129,29 @@ class EnhancedVideoQueueManager: self.max_history_age = max_history_age self.persistence_path = persistence_path self.backup_interval = backup_interval - + # Queue storage with priority self._queue: List[QueueItem] = [] self._queue_lock = asyncio.Lock() self._processing: Dict[str, QueueItem] = {} self._completed: Dict[str, QueueItem] = {} self._failed: Dict[str, QueueItem] = {} - + # Track active tasks self._active_tasks: Set[asyncio.Task] = set() self._processing_lock = asyncio.Lock() - + # Status tracking self._guild_queues: Dict[int, Set[str]] = {} self._channel_queues: Dict[int, Set[str]] = {} - + # Metrics tracking self.metrics = QueueMetrics() - + # Recovery tracking self._recovery_attempts: Dict[str, int] = {} self._last_backup: Optional[datetime] = None - + # Initialize tasks self._init_tasks() @@ -150,16 +160,16 @@ class EnhancedVideoQueueManager: # Cleanup and monitoring self._cleanup_task = asyncio.create_task(self._periodic_cleanup()) self._active_tasks.add(self._cleanup_task) - + # Health monitoring self._health_check_task = asyncio.create_task(self._monitor_health()) self._active_tasks.add(self._health_check_task) - + # Backup task if self.persistence_path: self._backup_task = asyncio.create_task(self._periodic_backup()) self._active_tasks.add(self._backup_task) - + # Load persisted queue self._load_persisted_queue() @@ -171,18 +181,18 @@ class EnhancedVideoQueueManager: guild_id: int, author_id: int, callback: Callable[[str, bool, str], Any], - priority: int = 0 + priority: int = 0, ) -> bool: """Add a video to the processing queue with priority support""" try: async with self._queue_lock: if len(self._queue) >= self.max_queue_size: raise QueueError("Queue is full") - + # Check system resources if psutil.virtual_memory().percent > 90: raise ResourceExhaustedError("System memory is critically low") - + # Create queue item item = QueueItem( url=url, @@ -191,36 +201,38 @@ class EnhancedVideoQueueManager: guild_id=guild_id, author_id=author_id, added_at=datetime.utcnow(), - priority=priority + priority=priority, ) - + # Add to tracking collections if guild_id not in self._guild_queues: self._guild_queues[guild_id] = set() self._guild_queues[guild_id].add(url) - + if channel_id not in self._channel_queues: self._channel_queues[channel_id] = set() self._channel_queues[channel_id].add(url) - + # Add to queue with priority self._queue.append(item) self._queue.sort(key=lambda x: (-x.priority, x.added_at)) - + # Persist queue state if self.persistence_path: await self._persist_queue() - + logger.info(f"Added video to queue: {url} with priority {priority}") return True - + except Exception as e: logger.error(f"Error adding video to queue: {traceback.format_exc()}") raise QueueError(f"Failed to add to queue: {str(e)}") - async def process_queue(self, processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]): + async def process_queue( + self, processor: Callable[[QueueItem], Tuple[bool, Optional[str]]] + ): """Process items in the queue with the provided processor function - + Args: processor: A callable that takes a QueueItem and returns a tuple of (success: bool, error: Optional[str]) """ @@ -234,20 +246,20 @@ class EnhancedVideoQueueManager: self._processing[item.url] = item item.status = "processing" item.processing_time = time.time() - + if not item: await asyncio.sleep(1) continue - + try: # Process the item start_time = time.time() success, error = await processor(item) processing_time = time.time() - start_time - + # Update metrics self.metrics.update(processing_time, success, error) - + # Update item status async with self._processing_lock: if success: @@ -259,22 +271,28 @@ class EnhancedVideoQueueManager: item.error = error item.last_error = error item.last_error_time = datetime.utcnow() - + # Handle retries if item.retry_count < self.max_retries: item.retry_count += 1 item.status = "pending" item.last_retry = datetime.utcnow() self._queue.append(item) - logger.warning(f"Retrying item: {item.url} (attempt {item.retry_count})") + logger.warning( + f"Retrying item: {item.url} (attempt {item.retry_count})" + ) else: self._failed[item.url] = item - logger.error(f"Failed to process item after {self.max_retries} attempts: {item.url}") - + logger.error( + f"Failed to process item after {self.max_retries} attempts: {item.url}" + ) + self._processing.pop(item.url, None) - + except Exception as e: - logger.error(f"Error processing item {item.url}: {traceback.format_exc()}") + logger.error( + f"Error processing item {item.url}: {traceback.format_exc()}" + ) async with self._processing_lock: item.status = "failed" item.error = str(e) @@ -282,15 +300,15 @@ class EnhancedVideoQueueManager: item.last_error_time = datetime.utcnow() self._failed[item.url] = item self._processing.pop(item.url, None) - + # Persist state after processing if self.persistence_path: await self._persist_queue() - + except Exception as e: logger.error(f"Error in queue processor: {traceback.format_exc()}") await asyncio.sleep(1) - + # Small delay to prevent CPU overload await asyncio.sleep(0.1) @@ -300,7 +318,8 @@ class EnhancedVideoQueueManager: try: if self.persistence_path and ( not self._last_backup - or (datetime.utcnow() - self._last_backup).total_seconds() >= self.backup_interval + or (datetime.utcnow() - self._last_backup).total_seconds() + >= self.backup_interval ): await self._persist_queue() self._last_backup = datetime.utcnow() @@ -313,7 +332,7 @@ class EnhancedVideoQueueManager: """Persist queue state to disk with improved error handling""" if not self.persistence_path: return - + try: state = { "queue": [asdict(item) for item in self._queue], @@ -327,23 +346,27 @@ class EnhancedVideoQueueManager: "success_rate": self.metrics.success_rate, "errors_by_type": self.metrics.errors_by_type, "last_error": self.metrics.last_error, - "last_error_time": self.metrics.last_error_time.isoformat() if self.metrics.last_error_time else None - } + "last_error_time": ( + self.metrics.last_error_time.isoformat() + if self.metrics.last_error_time + else None + ), + }, } - + # Ensure directory exists os.makedirs(os.path.dirname(self.persistence_path), exist_ok=True) - + # Write to temp file first temp_path = f"{self.persistence_path}.tmp" - async with aiofiles.open(temp_path, 'w') as f: - await f.write(json.dumps(state, default=str)) - await f.flush() + with open(temp_path, "w") as f: + json.dump(state, f, default=str) + f.flush() os.fsync(f.fileno()) - + # Atomic rename - await aiofiles.os.rename(temp_path, self.persistence_path) - + os.rename(temp_path, self.persistence_path) + except Exception as e: logger.error(f"Error persisting queue state: {traceback.format_exc()}") raise QueueError(f"Failed to persist queue state: {str(e)}") @@ -352,11 +375,11 @@ class EnhancedVideoQueueManager: """Load persisted queue state from disk with improved error handling""" if not self.persistence_path or not os.path.exists(self.persistence_path): return - + try: - with open(self.persistence_path, 'r') as f: + with open(self.persistence_path, "r") as f: state = json.load(f) - + # Restore queue items with datetime conversion self._queue = [] for item in state["queue"]: @@ -364,11 +387,13 @@ class EnhancedVideoQueueManager: if item.get("last_retry"): item["last_retry"] = datetime.fromisoformat(item["last_retry"]) self._queue.append(QueueItem(**item)) - - self._processing = {k: QueueItem(**v) for k, v in state["processing"].items()} + + self._processing = { + k: QueueItem(**v) for k, v in state["processing"].items() + } self._completed = {k: QueueItem(**v) for k, v in state["completed"].items()} self._failed = {k: QueueItem(**v) for k, v in state["failed"].items()} - + # Restore metrics self.metrics.total_processed = state["metrics"]["total_processed"] self.metrics.total_failed = state["metrics"]["total_failed"] @@ -377,20 +402,28 @@ class EnhancedVideoQueueManager: self.metrics.errors_by_type = state["metrics"]["errors_by_type"] self.metrics.last_error = state["metrics"]["last_error"] if state["metrics"]["last_error_time"]: - self.metrics.last_error_time = datetime.fromisoformat(state["metrics"]["last_error_time"]) - + self.metrics.last_error_time = datetime.fromisoformat( + state["metrics"]["last_error_time"] + ) + logger.info("Successfully loaded persisted queue state") - + except Exception as e: - logger.error(f"Error loading persisted queue state: {traceback.format_exc()}") + logger.error( + f"Error loading persisted queue state: {traceback.format_exc()}" + ) # Create backup of corrupted state file if os.path.exists(self.persistence_path): backup_path = f"{self.persistence_path}.bak.{int(time.time())}" try: os.rename(self.persistence_path, backup_path) - logger.info(f"Created backup of corrupted state file: {backup_path}") + logger.info( + f"Created backup of corrupted state file: {backup_path}" + ) except Exception as be: - logger.error(f"Failed to create backup of corrupted state file: {str(be)}") + logger.error( + f"Failed to create backup of corrupted state file: {str(be)}" + ) async def _monitor_health(self): """Monitor queue health and performance with improved metrics""" @@ -399,32 +432,35 @@ class EnhancedVideoQueueManager: # Check memory usage process = psutil.Process() memory_usage = process.memory_info().rss / 1024 / 1024 # MB - + if memory_usage > 1024: # 1GB logger.warning(f"High memory usage detected: {memory_usage:.2f}MB") # Force garbage collection import gc + gc.collect() - + # Check for potential deadlocks processing_times = [ time.time() - item.processing_time for item in self._processing.values() if item.processing_time > 0 ] - + if processing_times: max_time = max(processing_times) if max_time > 3600: # 1 hour - logger.warning(f"Potential deadlock detected: Item processing for {max_time:.2f}s") + logger.warning( + f"Potential deadlock detected: Item processing for {max_time:.2f}s" + ) # Attempt recovery await self._recover_stuck_items() - + # Calculate and log detailed metrics success_rate = self.metrics.success_rate error_distribution = self.metrics.errors_by_type avg_processing_time = self.metrics.avg_processing_time - + logger.info( f"Queue Health Metrics:\n" f"- Success Rate: {success_rate:.2%}\n" @@ -434,9 +470,9 @@ class EnhancedVideoQueueManager: f"- Queue Size: {len(self._queue)}\n" f"- Processing Items: {len(self._processing)}" ) - + await asyncio.sleep(300) # Check every 5 minutes - + except Exception as e: logger.error(f"Error in health monitor: {traceback.format_exc()}") await asyncio.sleep(60) @@ -447,7 +483,10 @@ class EnhancedVideoQueueManager: async with self._processing_lock: current_time = time.time() for url, item in list(self._processing.items()): - if item.processing_time > 0 and (current_time - item.processing_time) > 3600: + if ( + item.processing_time > 0 + and (current_time - item.processing_time) > 3600 + ): # Move to failed queue if max retries reached if item.retry_count >= self.max_retries: self._failed[url] = item @@ -462,7 +501,7 @@ class EnhancedVideoQueueManager: self._queue.append(item) self._processing.pop(url) logger.info(f"Recovered stuck item for retry: {url}") - + except Exception as e: logger.error(f"Error recovering stuck items: {str(e)}") @@ -473,13 +512,13 @@ class EnhancedVideoQueueManager: for task in self._active_tasks: if not task.done(): task.cancel() - + await asyncio.gather(*self._active_tasks, return_exceptions=True) - + # Persist final state if self.persistence_path: await self._persist_queue() - + # Clear all collections self._queue.clear() self._processing.clear() @@ -487,9 +526,9 @@ class EnhancedVideoQueueManager: self._failed.clear() self._guild_queues.clear() self._channel_queues.clear() - + logger.info("Queue manager cleanup completed") - + except Exception as e: logger.error(f"Error during cleanup: {str(e)}") raise CleanupError(f"Failed to clean up queue manager: {str(e)}") @@ -501,36 +540,44 @@ class EnhancedVideoQueueManager: guild_urls = self._guild_queues.get(guild_id, set()) status = { "pending": sum(1 for item in self._queue if item.url in guild_urls), - "processing": sum(1 for url in self._processing if url in guild_urls), + "processing": sum( + 1 for url in self._processing if url in guild_urls + ), "completed": sum(1 for url in self._completed if url in guild_urls), - "failed": sum(1 for url in self._failed if url in guild_urls) + "failed": sum(1 for url in self._failed if url in guild_urls), } else: status = { "pending": len(self._queue), "processing": len(self._processing), "completed": len(self._completed), - "failed": len(self._failed) + "failed": len(self._failed), } - + # Add detailed metrics - status.update({ - "metrics": { - "total_processed": self.metrics.total_processed, - "total_failed": self.metrics.total_failed, - "success_rate": self.metrics.success_rate, - "avg_processing_time": self.metrics.avg_processing_time, - "peak_memory_usage": self.metrics.peak_memory_usage, - "last_cleanup": self.metrics.last_cleanup.isoformat(), - "errors_by_type": self.metrics.errors_by_type, - "last_error": self.metrics.last_error, - "last_error_time": self.metrics.last_error_time.isoformat() if self.metrics.last_error_time else None, - "retries": self.metrics.retries + status.update( + { + "metrics": { + "total_processed": self.metrics.total_processed, + "total_failed": self.metrics.total_failed, + "success_rate": self.metrics.success_rate, + "avg_processing_time": self.metrics.avg_processing_time, + "peak_memory_usage": self.metrics.peak_memory_usage, + "last_cleanup": self.metrics.last_cleanup.isoformat(), + "errors_by_type": self.metrics.errors_by_type, + "last_error": self.metrics.last_error, + "last_error_time": ( + self.metrics.last_error_time.isoformat() + if self.metrics.last_error_time + else None + ), + "retries": self.metrics.retries, + } } - }) - + ) + return status - + except Exception as e: logger.error(f"Error getting queue status: {str(e)}") raise QueueError(f"Failed to get queue status: {str(e)}") @@ -541,38 +588,40 @@ class EnhancedVideoQueueManager: try: current_time = datetime.utcnow() cleanup_cutoff = current_time - timedelta(seconds=self.max_history_age) - + async with self._queue_lock: # Clean up completed items for url in list(self._completed.keys()): item = self._completed[url] if item.added_at < cleanup_cutoff: self._completed.pop(url) - + # Clean up failed items for url in list(self._failed.keys()): item = self._failed[url] if item.added_at < cleanup_cutoff: self._failed.pop(url) - + # Clean up guild and channel tracking for guild_id in list(self._guild_queues.keys()): self._guild_queues[guild_id] = { - url for url in self._guild_queues[guild_id] + url + for url in self._guild_queues[guild_id] if url in self._queue or url in self._processing } - + for channel_id in list(self._channel_queues.keys()): self._channel_queues[channel_id] = { - url for url in self._channel_queues[channel_id] + url + for url in self._channel_queues[channel_id] if url in self._queue or url in self._processing } - + self.metrics.last_cleanup = current_time logger.info("Completed periodic queue cleanup") - + await asyncio.sleep(self.cleanup_interval) - + except Exception as e: logger.error(f"Error in periodic cleanup: {traceback.format_exc()}") await asyncio.sleep(60)