From b1eafbb01d80a566d5e775b04d0490a159a89236 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Fri, 15 Nov 2024 18:16:53 +0000 Subject: [PATCH] Refactor queue system into modular structure - Created new queue module with separate components: - models.py: QueueItem and QueueMetrics data classes - persistence.py: Queue state persistence - monitoring.py: Health monitoring and metrics - cleanup.py: Cleanup operations - manager.py: Main queue management - __init__.py: Package exports - Updated imports in video_archiver.py and processor.py - Removed old enhanced_queue.py - Updated README with new queue system details This refactoring improves code organization and maintainability through better separation of concerns while maintaining all existing functionality. --- README.md | 4 +- videoarchiver/processor.py | 2 +- videoarchiver/queue/__init__.py | 19 ++ videoarchiver/queue/cleanup.py | 243 ++++++++++++++++ videoarchiver/queue/manager.py | 453 +++++++++++++++++++++++++++++ videoarchiver/queue/models.py | 150 ++++++++++ videoarchiver/queue/monitoring.py | 172 +++++++++++ videoarchiver/queue/persistence.py | 201 +++++++++++++ videoarchiver/video_archiver.py | 2 +- 9 files changed, 1242 insertions(+), 4 deletions(-) create mode 100644 videoarchiver/queue/__init__.py create mode 100644 videoarchiver/queue/cleanup.py create mode 100644 videoarchiver/queue/manager.py create mode 100644 videoarchiver/queue/models.py create mode 100644 videoarchiver/queue/monitoring.py create mode 100644 videoarchiver/queue/persistence.py diff --git a/README.md b/README.md index 27d72f3..6b5e529 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Welcome to **Pac-cogs**, a collection of custom cogs for [Red](https://github.co |------------|--------------------------------------------------| | **Birthday** | Assigns a special birthday role to users and sends a celebratory message with random cake or pie emojis. Features include: automatic role removal at midnight in configurable timezone, custom announcement channels, role-based command permissions, random cake/pie emoji generation, task persistence across bot restarts, context menu support (right-click user to assign role), birthday role removal task checking, and no hierarchy requirements for role assignment. Perfect for automated birthday celebrations! | | **Overseerr** | Allows interaction with [Overseerr](https://overseerr.dev/) directly from Discord. Users can search for movies or TV shows, request them, and have admins approve requests. Features include: media availability checking, request status tracking, admin role configuration, direct integration with Overseerr's API, and full slash command support. Requires a running Overseerr instance and API key. | -| **VideoArchiver** | A powerful video archiving cog that automatically downloads and reposts videos from monitored channels. Features hardware-accelerated compression (NVIDIA, AMD, Intel, ARM), multi-video processing, enhanced queue system with priority processing, role-based permissions, automatic file cleanup, queue persistence across bot restarts, and support for multiple video platforms via yt-dlp. Automatically compresses videos to meet Discord's file size limits while maintaining quality. | +| **VideoArchiver** | A powerful video archiving cog that automatically downloads and reposts videos from monitored channels. Features hardware-accelerated compression (NVIDIA, AMD, Intel, ARM), multi-video processing, modular queue system with priority processing and state persistence, role-based permissions, automatic file cleanup, and support for multiple video platforms via yt-dlp. The enhanced queue system provides metrics tracking, health monitoring, and efficient resource management while maintaining quality. | ## Installation @@ -59,6 +59,6 @@ Replace `[p]` with your bot's prefix. [p]overseerr apikey ``` -- **VideoArchiver**: The cog requires FFmpeg for video processing. The cog will attempt to download and manage FFmpeg automatically if it's not found on your system. The required Python packages (yt-dlp, ffmpeg-python, requests) will be installed automatically during cog installation. Features an enhanced queue system with priority processing, performance metrics, and automatic cleanup. +- **VideoArchiver**: The cog requires FFmpeg for video processing. The cog will attempt to download and manage FFmpeg automatically if it's not found on your system. The required Python packages (yt-dlp, ffmpeg-python, requests) will be installed automatically during cog installation. Features a modular queue system with priority processing, performance metrics, health monitoring, and automatic cleanup. The queue system maintains state across bot restarts and provides efficient resource management. For more details on setting up and managing Red, visit the [Red documentation](https://docs.discord.red). diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index abc1e58..793968a 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -11,7 +11,7 @@ from typing import Dict, Any, Optional, Tuple, Set import traceback from datetime import datetime -from videoarchiver.enhanced_queue import EnhancedVideoQueueManager +from videoarchiver.queue import EnhancedVideoQueueManager # Updated import from videoarchiver.utils.exceptions import ( ProcessingError, ConfigurationError, diff --git a/videoarchiver/queue/__init__.py b/videoarchiver/queue/__init__.py new file mode 100644 index 0000000..b690bda --- /dev/null +++ b/videoarchiver/queue/__init__.py @@ -0,0 +1,19 @@ +"""Queue management package for video processing""" + +from .models import QueueItem, QueueMetrics +from .manager import EnhancedVideoQueueManager +from .persistence import QueuePersistenceManager, QueueError +from .monitoring import QueueMonitor, MonitoringError +from .cleanup import QueueCleaner, CleanupError + +__all__ = [ + 'QueueItem', + 'QueueMetrics', + 'EnhancedVideoQueueManager', + 'QueuePersistenceManager', + 'QueueMonitor', + 'QueueCleaner', + 'QueueError', + 'MonitoringError', + 'CleanupError', +] diff --git a/videoarchiver/queue/cleanup.py b/videoarchiver/queue/cleanup.py new file mode 100644 index 0000000..5aeb982 --- /dev/null +++ b/videoarchiver/queue/cleanup.py @@ -0,0 +1,243 @@ +"""Queue cleanup operations""" + +import asyncio +import logging +from datetime import datetime, timedelta +from typing import Dict, List, Set +from .models import QueueItem, QueueMetrics + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("QueueCleanup") + +class QueueCleaner: + """Handles cleanup of old queue items and tracking data""" + + def __init__( + self, + cleanup_interval: int = 3600, # 1 hour + max_history_age: int = 86400, # 24 hours + ): + self.cleanup_interval = cleanup_interval + self.max_history_age = max_history_age + self._shutdown = False + + async def start_cleanup( + self, + queue: List[QueueItem], + completed: Dict[str, QueueItem], + failed: Dict[str, QueueItem], + guild_queues: Dict[int, Set[str]], + channel_queues: Dict[int, Set[str]], + processing: Dict[str, QueueItem], + metrics: QueueMetrics, + queue_lock: asyncio.Lock + ) -> None: + """Start periodic cleanup process + + Args: + queue: Reference to the queue list + completed: Reference to completed items dict + failed: Reference to failed items dict + guild_queues: Reference to guild tracking dict + channel_queues: Reference to channel tracking dict + processing: Reference to processing dict + metrics: Reference to queue metrics + queue_lock: Lock for queue operations + """ + while not self._shutdown: + try: + await self._perform_cleanup( + queue, + completed, + failed, + guild_queues, + channel_queues, + processing, + metrics, + queue_lock + ) + await asyncio.sleep(self.cleanup_interval) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in periodic cleanup: {str(e)}") + await asyncio.sleep(60) + + def stop_cleanup(self) -> None: + """Stop the cleanup process""" + self._shutdown = True + + async def _perform_cleanup( + self, + queue: List[QueueItem], + completed: Dict[str, QueueItem], + failed: Dict[str, QueueItem], + guild_queues: Dict[int, Set[str]], + channel_queues: Dict[int, Set[str]], + processing: Dict[str, QueueItem], + metrics: QueueMetrics, + queue_lock: asyncio.Lock + ) -> None: + """Perform cleanup operations + + Args: + queue: Reference to the queue list + completed: Reference to completed items dict + failed: Reference to failed items dict + guild_queues: Reference to guild tracking dict + channel_queues: Reference to channel tracking dict + processing: Reference to processing dict + metrics: Reference to queue metrics + queue_lock: Lock for queue operations + """ + try: + current_time = datetime.utcnow() + cleanup_cutoff = current_time - timedelta(seconds=self.max_history_age) + + async with queue_lock: + # Clean up completed items + for url in list(completed.keys()): + try: + item = completed[url] + # Ensure added_at is a datetime object + if not isinstance(item.added_at, datetime): + try: + if isinstance(item.added_at, str): + item.added_at = datetime.fromisoformat(item.added_at) + else: + item.added_at = current_time + except (ValueError, TypeError): + item.added_at = current_time + + if item.added_at < cleanup_cutoff: + completed.pop(url) + except Exception as e: + logger.error(f"Error processing completed item {url}: {e}") + completed.pop(url) + + # Clean up failed items + for url in list(failed.keys()): + try: + item = failed[url] + # Ensure added_at is a datetime object + if not isinstance(item.added_at, datetime): + try: + if isinstance(item.added_at, str): + item.added_at = datetime.fromisoformat(item.added_at) + else: + item.added_at = current_time + except (ValueError, TypeError): + item.added_at = current_time + + if item.added_at < cleanup_cutoff: + failed.pop(url) + except Exception as e: + logger.error(f"Error processing failed item {url}: {e}") + failed.pop(url) + + # Clean up guild tracking + for guild_id in list(guild_queues.keys()): + guild_queues[guild_id] = { + url for url in guild_queues[guild_id] + if url in queue or url in processing + } + if not guild_queues[guild_id]: + guild_queues.pop(guild_id) + + # Clean up channel tracking + for channel_id in list(channel_queues.keys()): + channel_queues[channel_id] = { + url for url in channel_queues[channel_id] + if url in queue or url in processing + } + if not channel_queues[channel_id]: + channel_queues.pop(channel_id) + + metrics.last_cleanup = current_time + logger.info("Completed periodic queue cleanup") + + except Exception as e: + logger.error(f"Error during cleanup: {str(e)}") + raise + + async def clear_guild_queue( + self, + guild_id: int, + queue: List[QueueItem], + processing: Dict[str, QueueItem], + completed: Dict[str, QueueItem], + failed: Dict[str, QueueItem], + guild_queues: Dict[int, Set[str]], + channel_queues: Dict[int, Set[str]], + queue_lock: asyncio.Lock + ) -> int: + """Clear all queue items for a specific guild + + Args: + guild_id: ID of the guild to clear + queue: Reference to the queue list + processing: Reference to processing dict + completed: Reference to completed items dict + failed: Reference to failed items dict + guild_queues: Reference to guild tracking dict + channel_queues: Reference to channel tracking dict + queue_lock: Lock for queue operations + + Returns: + Number of items cleared + """ + try: + cleared_count = 0 + async with queue_lock: + # Get URLs for this guild + guild_urls = guild_queues.get(guild_id, set()) + + # Clear from pending queue + queue[:] = [item for item in queue if item.guild_id != guild_id] + + # Clear from processing + for url in list(processing.keys()): + if processing[url].guild_id == guild_id: + processing.pop(url) + cleared_count += 1 + + # Clear from completed + for url in list(completed.keys()): + if completed[url].guild_id == guild_id: + completed.pop(url) + cleared_count += 1 + + # Clear from failed + for url in list(failed.keys()): + if failed[url].guild_id == guild_id: + failed.pop(url) + cleared_count += 1 + + # Clear guild tracking + if guild_id in guild_queues: + cleared_count += len(guild_queues[guild_id]) + guild_queues.pop(guild_id) + + # Clear channel tracking for this guild's channels + for channel_id in list(channel_queues.keys()): + channel_queues[channel_id] = { + url for url in channel_queues[channel_id] + if url not in guild_urls + } + if not channel_queues[channel_id]: + channel_queues.pop(channel_id) + + logger.info(f"Cleared {cleared_count} items from guild {guild_id} queue") + return cleared_count + + except Exception as e: + logger.error(f"Error clearing guild queue: {str(e)}") + raise + +class CleanupError(Exception): + """Base exception for cleanup-related errors""" + pass diff --git a/videoarchiver/queue/manager.py b/videoarchiver/queue/manager.py new file mode 100644 index 0000000..e4f4fb1 --- /dev/null +++ b/videoarchiver/queue/manager.py @@ -0,0 +1,453 @@ +"""Enhanced queue manager for video processing""" + +import asyncio +import logging +from typing import Dict, Optional, Set, Tuple, Callable, Any, List +from datetime import datetime + +from .models import QueueItem, QueueMetrics +from .persistence import QueuePersistenceManager, QueueError +from .monitoring import QueueMonitor, MonitoringError +from .cleanup import QueueCleaner, CleanupError + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("QueueManager") + +class EnhancedVideoQueueManager: + """Enhanced queue manager with improved memory management and performance""" + + def __init__( + self, + max_retries: int = 3, + retry_delay: int = 5, + max_queue_size: int = 1000, + cleanup_interval: int = 3600, # 1 hour + max_history_age: int = 86400, # 24 hours + persistence_path: Optional[str] = None, + backup_interval: int = 300, # 5 minutes + deadlock_threshold: int = 900, # 15 minutes + ): + # Configuration + self.max_retries = max_retries + self.retry_delay = retry_delay + self.max_queue_size = max_queue_size + + # Queue storage + self._queue: List[QueueItem] = [] + self._processing: Dict[str, QueueItem] = {} + self._completed: Dict[str, QueueItem] = {} + self._failed: Dict[str, QueueItem] = {} + + # Tracking + self._guild_queues: Dict[int, Set[str]] = {} + self._channel_queues: Dict[int, Set[str]] = {} + self._active_tasks: Set[asyncio.Task] = set() + + # Locks + self._queue_lock = asyncio.Lock() + self._processing_lock = asyncio.Lock() + + # State + self._shutdown = False + self.metrics = QueueMetrics() + + # Components + self.persistence = QueuePersistenceManager(persistence_path) if persistence_path else None + self.monitor = QueueMonitor( + deadlock_threshold=deadlock_threshold, + max_retries=max_retries + ) + self.cleaner = QueueCleaner( + cleanup_interval=cleanup_interval, + max_history_age=max_history_age + ) + + # Initialize tasks + self._init_tasks() + + def _init_tasks(self) -> None: + """Initialize background tasks""" + # Start monitoring + monitor_task = asyncio.create_task( + self.monitor.start_monitoring( + self._queue, + self._processing, + self.metrics, + self._processing_lock + ) + ) + self._active_tasks.add(monitor_task) + + # Start cleanup + cleanup_task = asyncio.create_task( + self.cleaner.start_cleanup( + self._queue, + self._completed, + self._failed, + self._guild_queues, + self._channel_queues, + self._processing, + self.metrics, + self._queue_lock + ) + ) + self._active_tasks.add(cleanup_task) + + # Load persisted state if available + if self.persistence: + self._load_persisted_state() + + def _load_persisted_state(self) -> None: + """Load persisted queue state""" + try: + state = self.persistence.load_queue_state() + if state: + self._queue = state["queue"] + self._processing = state["processing"] + self._completed = state["completed"] + self._failed = state["failed"] + + # Update metrics + metrics_data = state.get("metrics", {}) + self.metrics.total_processed = metrics_data.get("total_processed", 0) + self.metrics.total_failed = metrics_data.get("total_failed", 0) + self.metrics.avg_processing_time = metrics_data.get("avg_processing_time", 0.0) + self.metrics.success_rate = metrics_data.get("success_rate", 0.0) + self.metrics.errors_by_type = metrics_data.get("errors_by_type", {}) + self.metrics.compression_failures = metrics_data.get("compression_failures", 0) + self.metrics.hardware_accel_failures = metrics_data.get("hardware_accel_failures", 0) + + except Exception as e: + logger.error(f"Failed to load persisted state: {e}") + + async def process_queue( + self, + processor: Callable[[QueueItem], Tuple[bool, Optional[str]]] + ) -> None: + """Process items in the queue + + Args: + processor: Function that processes queue items + """ + logger.info("Queue processor started") + while not self._shutdown: + try: + # Get next item from queue + item = None + async with self._queue_lock: + if self._queue: + item = self._queue.pop(0) + self._processing[item.url] = item + item.status = "processing" + item.processing_time = 0.0 + + if not item: + await asyncio.sleep(1) + continue + + try: + # Process the item + logger.info(f"Processing queue item: {item.url}") + success, error = await processor(item) + + # Update metrics and status + async with self._processing_lock: + if success: + item.status = "completed" + self._completed[item.url] = item + logger.info(f"Successfully processed: {item.url}") + else: + item.status = "failed" + item.error = error + item.last_error = error + item.last_error_time = datetime.utcnow() + + if item.retry_count < self.max_retries: + item.retry_count += 1 + item.status = "pending" + item.last_retry = datetime.utcnow() + item.priority = max(0, item.priority - 1) + self._queue.append(item) + logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})") + else: + self._failed[item.url] = item + logger.error(f"Failed after {self.max_retries} attempts: {item.url}") + + self._processing.pop(item.url, None) + + except Exception as e: + logger.error(f"Error processing {item.url}: {e}") + async with self._processing_lock: + item.status = "failed" + item.error = str(e) + item.last_error = str(e) + item.last_error_time = datetime.utcnow() + self._failed[item.url] = item + self._processing.pop(item.url, None) + + # Persist state if enabled + if self.persistence: + await self.persistence.persist_queue_state( + self._queue, + self._processing, + self._completed, + self._failed, + self.metrics + ) + + except Exception as e: + logger.error(f"Critical error in queue processor: {e}") + await asyncio.sleep(1) + + await asyncio.sleep(0.1) + + logger.info("Queue processor stopped") + + async def add_to_queue( + self, + url: str, + message_id: int, + channel_id: int, + guild_id: int, + author_id: int, + priority: int = 0, + ) -> bool: + """Add a video to the processing queue + + Args: + url: Video URL + message_id: Discord message ID + channel_id: Discord channel ID + guild_id: Discord guild ID + author_id: Discord author ID + priority: Queue priority (higher = higher priority) + + Returns: + True if added successfully + + Raises: + QueueError: If queue is full or shutting down + """ + if self._shutdown: + raise QueueError("Queue manager is shutting down") + + try: + async with self._queue_lock: + if len(self._queue) >= self.max_queue_size: + raise QueueError("Queue is full") + + item = QueueItem( + url=url, + message_id=message_id, + channel_id=channel_id, + guild_id=guild_id, + author_id=author_id, + added_at=datetime.utcnow(), + priority=priority, + ) + + # Add to tracking + if guild_id not in self._guild_queues: + self._guild_queues[guild_id] = set() + self._guild_queues[guild_id].add(url) + + if channel_id not in self._channel_queues: + self._channel_queues[channel_id] = set() + self._channel_queues[channel_id].add(url) + + # Add to queue with priority + self._queue.append(item) + self._queue.sort(key=lambda x: (-x.priority, x.added_at)) + + if self.persistence: + await self.persistence.persist_queue_state( + self._queue, + self._processing, + self._completed, + self._failed, + self.metrics + ) + + logger.info(f"Added to queue: {url} (priority: {priority})") + return True + + except Exception as e: + logger.error(f"Error adding to queue: {e}") + raise QueueError(f"Failed to add to queue: {str(e)}") + + def get_queue_status(self, guild_id: int) -> dict: + """Get current queue status for a guild + + Args: + guild_id: Discord guild ID + + Returns: + Dict containing queue status and metrics + """ + try: + pending = len([item for item in self._queue if item.guild_id == guild_id]) + processing = len([item for item in self._processing.values() if item.guild_id == guild_id]) + completed = len([item for item in self._completed.values() if item.guild_id == guild_id]) + failed = len([item for item in self._failed.values() if item.guild_id == guild_id]) + + return { + "pending": pending, + "processing": processing, + "completed": completed, + "failed": failed, + "metrics": { + "total_processed": self.metrics.total_processed, + "total_failed": self.metrics.total_failed, + "success_rate": self.metrics.success_rate, + "avg_processing_time": self.metrics.avg_processing_time, + "peak_memory_usage": self.metrics.peak_memory_usage, + "last_cleanup": self.metrics.last_cleanup.strftime("%Y-%m-%d %H:%M:%S"), + "errors_by_type": self.metrics.errors_by_type, + "compression_failures": self.metrics.compression_failures, + "hardware_accel_failures": self.metrics.hardware_accel_failures, + }, + } + + except Exception as e: + logger.error(f"Error getting queue status: {e}") + return { + "pending": 0, + "processing": 0, + "completed": 0, + "failed": 0, + "metrics": { + "total_processed": 0, + "total_failed": 0, + "success_rate": 0.0, + "avg_processing_time": 0.0, + "peak_memory_usage": 0.0, + "last_cleanup": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), + "errors_by_type": {}, + "compression_failures": 0, + "hardware_accel_failures": 0, + }, + } + + async def clear_guild_queue(self, guild_id: int) -> int: + """Clear all queue items for a guild + + Args: + guild_id: Discord guild ID + + Returns: + Number of items cleared + + Raises: + QueueError: If queue is shutting down + """ + if self._shutdown: + raise QueueError("Queue manager is shutting down") + + try: + cleared = await self.cleaner.clear_guild_queue( + guild_id, + self._queue, + self._processing, + self._completed, + self._failed, + self._guild_queues, + self._channel_queues, + self._queue_lock + ) + + if self.persistence: + await self.persistence.persist_queue_state( + self._queue, + self._processing, + self._completed, + self._failed, + self.metrics + ) + + return cleared + + except Exception as e: + logger.error(f"Error clearing guild queue: {e}") + raise QueueError(f"Failed to clear guild queue: {str(e)}") + + async def cleanup(self) -> None: + """Clean up resources and stop queue processing""" + try: + self._shutdown = True + + # Stop monitoring and cleanup tasks + self.monitor.stop_monitoring() + self.cleaner.stop_cleanup() + + # Cancel all active tasks + for task in self._active_tasks: + if not task.done(): + task.cancel() + + await asyncio.gather(*self._active_tasks, return_exceptions=True) + + # Move processing items back to queue + async with self._queue_lock: + for url, item in self._processing.items(): + if item.retry_count < self.max_retries: + item.status = "pending" + item.retry_count += 1 + self._queue.append(item) + else: + self._failed[url] = item + + self._processing.clear() + + # Final state persistence + if self.persistence: + await self.persistence.persist_queue_state( + self._queue, + self._processing, + self._completed, + self._failed, + self.metrics + ) + + # Clear collections + self._queue.clear() + self._completed.clear() + self._failed.clear() + self._guild_queues.clear() + self._channel_queues.clear() + self._active_tasks.clear() + + logger.info("Queue manager cleanup completed") + + except Exception as e: + logger.error(f"Error during cleanup: {e}") + raise CleanupError(f"Failed to clean up queue manager: {str(e)}") + + def force_stop(self) -> None: + """Force stop all queue operations immediately""" + self._shutdown = True + + # Stop monitoring and cleanup + self.monitor.stop_monitoring() + self.cleaner.stop_cleanup() + + # Cancel all active tasks + for task in self._active_tasks: + if not task.done(): + task.cancel() + + # Move processing items back to queue + for url, item in self._processing.items(): + if item.retry_count < self.max_retries: + item.status = "pending" + item.retry_count += 1 + self._queue.append(item) + else: + self._failed[url] = item + + self._processing.clear() + self._active_tasks.clear() + + logger.info("Queue manager force stopped") diff --git a/videoarchiver/queue/models.py b/videoarchiver/queue/models.py new file mode 100644 index 0000000..a65c83b --- /dev/null +++ b/videoarchiver/queue/models.py @@ -0,0 +1,150 @@ +"""Data models for the queue system""" + +import logging +from dataclasses import dataclass, field, asdict +from datetime import datetime +from typing import Dict, Optional, List, Any + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("QueueModels") + +@dataclass +class QueueMetrics: + """Metrics tracking for queue performance and health""" + + total_processed: int = 0 + total_failed: int = 0 + avg_processing_time: float = 0.0 + success_rate: float = 0.0 + errors_by_type: Dict[str, int] = field(default_factory=dict) + last_error: Optional[str] = None + last_error_time: Optional[datetime] = None + last_cleanup: datetime = field(default_factory=datetime.utcnow) + retries: int = 0 + peak_memory_usage: float = 0.0 + processing_times: List[float] = field(default_factory=list) + compression_failures: int = 0 + hardware_accel_failures: int = 0 + + def update(self, processing_time: float, success: bool, error: str = None): + """Update metrics with new processing information""" + self.total_processed += 1 + if not success: + self.total_failed += 1 + if error: + self.last_error = error + self.last_error_time = datetime.utcnow() + error_type = error.split(":")[0] if ":" in error else error + self.errors_by_type[error_type] = ( + self.errors_by_type.get(error_type, 0) + 1 + ) + + # Track specific error types + if "compression error" in error.lower(): + self.compression_failures += 1 + elif "hardware acceleration failed" in error.lower(): + self.hardware_accel_failures += 1 + + # Update processing times with sliding window + self.processing_times.append(processing_time) + if len(self.processing_times) > 100: # Keep last 100 processing times + self.processing_times.pop(0) + + # Update average processing time + self.avg_processing_time = ( + sum(self.processing_times) / len(self.processing_times) + if self.processing_times + else 0.0 + ) + + # Update success rate + self.success_rate = ( + (self.total_processed - self.total_failed) / self.total_processed + if self.total_processed > 0 + else 0.0 + ) + +@dataclass +class QueueItem: + """Represents a video processing task in the queue""" + + url: str + message_id: int + channel_id: int + guild_id: int + author_id: int + added_at: datetime + priority: int = 0 # Higher number = higher priority + status: str = "pending" # pending, processing, completed, failed + error: Optional[str] = None + attempt: int = 0 + _processing_time: float = 0.0 # Use private field for processing_time + size_bytes: int = 0 + last_error: Optional[str] = None + retry_count: int = 0 + last_retry: Optional[datetime] = None + processing_times: List[float] = field(default_factory=list) + last_error_time: Optional[datetime] = None + hardware_accel_attempted: bool = False + compression_attempted: bool = False + original_message: Optional[Any] = None # Store the original message reference + + @property + def processing_time(self) -> float: + """Get processing time as float""" + return self._processing_time + + @processing_time.setter + def processing_time(self, value: Any) -> None: + """Set processing time, ensuring it's always a float""" + try: + if isinstance(value, str): + self._processing_time = float(value) + elif isinstance(value, (int, float)): + self._processing_time = float(value) + else: + self._processing_time = 0.0 + except (ValueError, TypeError): + self._processing_time = 0.0 + + def to_dict(self) -> dict: + """Convert to dictionary with datetime handling""" + data = asdict(self) + # Convert datetime objects to ISO format strings + if self.added_at: + data['added_at'] = self.added_at.isoformat() + if self.last_retry: + data['last_retry'] = self.last_retry.isoformat() + if self.last_error_time: + data['last_error_time'] = self.last_error_time.isoformat() + # Convert _processing_time to processing_time in dict + data['processing_time'] = self.processing_time + data.pop('_processing_time', None) + return data + + @classmethod + def from_dict(cls, data: dict) -> 'QueueItem': + """Create from dictionary with datetime handling""" + # Convert ISO format strings back to datetime objects + if 'added_at' in data and isinstance(data['added_at'], str): + data['added_at'] = datetime.fromisoformat(data['added_at']) + if 'last_retry' in data and isinstance(data['last_retry'], str): + data['last_retry'] = datetime.fromisoformat(data['last_retry']) + if 'last_error_time' in data and isinstance(data['last_error_time'], str): + data['last_error_time'] = datetime.fromisoformat(data['last_error_time']) + # Handle processing_time conversion + if 'processing_time' in data: + try: + if isinstance(data['processing_time'], str): + data['_processing_time'] = float(data['processing_time']) + elif isinstance(data['processing_time'], (int, float)): + data['_processing_time'] = float(data['processing_time']) + else: + data['_processing_time'] = 0.0 + except (ValueError, TypeError): + data['_processing_time'] = 0.0 + data.pop('processing_time', None) + return cls(**data) diff --git a/videoarchiver/queue/monitoring.py b/videoarchiver/queue/monitoring.py new file mode 100644 index 0000000..13660cd --- /dev/null +++ b/videoarchiver/queue/monitoring.py @@ -0,0 +1,172 @@ +"""Queue monitoring and health checks""" + +import asyncio +import logging +import psutil +import time +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Set +from .models import QueueItem, QueueMetrics + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("QueueMonitoring") + +class QueueMonitor: + """Monitors queue health and performance""" + + def __init__( + self, + deadlock_threshold: int = 900, # 15 minutes + memory_threshold: int = 1024, # 1GB + max_retries: int = 3 + ): + self.deadlock_threshold = deadlock_threshold + self.memory_threshold = memory_threshold + self.max_retries = max_retries + self._shutdown = False + + async def start_monitoring( + self, + queue: List[QueueItem], + processing: Dict[str, QueueItem], + metrics: QueueMetrics, + processing_lock: asyncio.Lock + ) -> None: + """Start monitoring queue health + + Args: + queue: Reference to the queue list + processing: Reference to processing dict + metrics: Reference to queue metrics + processing_lock: Lock for processing dict + """ + while not self._shutdown: + try: + await self._check_health(queue, processing, metrics, processing_lock) + await asyncio.sleep(300) # Check every 5 minutes + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in health monitor: {str(e)}") + await asyncio.sleep(60) + + def stop_monitoring(self) -> None: + """Stop the monitoring process""" + self._shutdown = True + + async def _check_health( + self, + queue: List[QueueItem], + processing: Dict[str, QueueItem], + metrics: QueueMetrics, + processing_lock: asyncio.Lock + ) -> None: + """Check queue health and performance + + Args: + queue: Reference to the queue list + processing: Reference to processing dict + metrics: Reference to queue metrics + processing_lock: Lock for processing dict + """ + try: + # Check memory usage + process = psutil.Process() + memory_usage = process.memory_info().rss / 1024 / 1024 # MB + + if memory_usage > self.memory_threshold: + logger.warning(f"High memory usage detected: {memory_usage:.2f}MB") + # Force garbage collection + import gc + gc.collect() + + # Check for potential deadlocks + current_time = time.time() + processing_times = [] + stuck_items = [] + + for url, item in processing.items(): + if isinstance(item.processing_time, (int, float)) and item.processing_time > 0: + processing_time = current_time - item.processing_time + processing_times.append(processing_time) + if processing_time > self.deadlock_threshold: + stuck_items.append((url, item)) + + if stuck_items: + logger.warning( + f"Potential deadlock detected: {len(stuck_items)} items stuck" + ) + await self._recover_stuck_items( + stuck_items, queue, processing, processing_lock + ) + + # Calculate and log metrics + success_rate = metrics.success_rate + error_distribution = metrics.errors_by_type + avg_processing_time = metrics.avg_processing_time + + # Update peak memory usage + metrics.peak_memory_usage = max(metrics.peak_memory_usage, memory_usage) + + logger.info( + f"Queue Health Metrics:\n" + f"- Success Rate: {success_rate:.2%}\n" + f"- Avg Processing Time: {avg_processing_time:.2f}s\n" + f"- Memory Usage: {memory_usage:.2f}MB\n" + f"- Error Distribution: {error_distribution}\n" + f"- Queue Size: {len(queue)}\n" + f"- Processing Items: {len(processing)}" + ) + + except Exception as e: + logger.error(f"Error checking queue health: {str(e)}") + raise + + async def _recover_stuck_items( + self, + stuck_items: List[tuple[str, QueueItem]], + queue: List[QueueItem], + processing: Dict[str, QueueItem], + processing_lock: asyncio.Lock + ) -> None: + """Attempt to recover stuck items + + Args: + stuck_items: List of (url, item) tuples for stuck items + queue: Reference to the queue list + processing: Reference to processing dict + processing_lock: Lock for processing dict + """ + try: + async with processing_lock: + for url, item in stuck_items: + # Move to failed if max retries reached + if item.retry_count >= self.max_retries: + logger.warning(f"Moving stuck item to failed: {url}") + item.status = "failed" + item.error = "Exceeded maximum retries after being stuck" + item.last_error = item.error + item.last_error_time = datetime.utcnow() + processing.pop(url) + else: + # Reset for retry + logger.info(f"Recovering stuck item for retry: {url}") + item.retry_count += 1 + item.processing_time = 0 + item.last_retry = datetime.utcnow() + item.status = "pending" + item.priority = max(0, item.priority - 2) # Lower priority + queue.append(item) + processing.pop(url) + + except Exception as e: + logger.error(f"Error recovering stuck items: {str(e)}") + raise + +class MonitoringError(Exception): + """Base exception for monitoring-related errors""" + pass diff --git a/videoarchiver/queue/persistence.py b/videoarchiver/queue/persistence.py new file mode 100644 index 0000000..6333fed --- /dev/null +++ b/videoarchiver/queue/persistence.py @@ -0,0 +1,201 @@ +"""Queue persistence management""" + +import json +import logging +import os +import time +from datetime import datetime +from typing import Dict, Any, Optional +from .models import QueueItem, QueueMetrics + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("QueuePersistence") + +class QueuePersistenceManager: + """Manages persistence of queue state to disk""" + + def __init__(self, persistence_path: str): + """Initialize the persistence manager + + Args: + persistence_path: Path to the persistence file + """ + self.persistence_path = persistence_path + + async def persist_queue_state( + self, + queue: list[QueueItem], + processing: Dict[str, QueueItem], + completed: Dict[str, QueueItem], + failed: Dict[str, QueueItem], + metrics: QueueMetrics + ) -> None: + """Persist queue state to disk with improved error handling + + Args: + queue: List of pending queue items + processing: Dict of items currently being processed + completed: Dict of completed items + failed: Dict of failed items + metrics: Queue metrics object + + Raises: + QueueError: If persistence fails + """ + try: + state = { + "queue": [item.to_dict() for item in queue], + "processing": {k: v.to_dict() for k, v in processing.items()}, + "completed": {k: v.to_dict() for k, v in completed.items()}, + "failed": {k: v.to_dict() for k, v in failed.items()}, + "metrics": { + "total_processed": metrics.total_processed, + "total_failed": metrics.total_failed, + "avg_processing_time": metrics.avg_processing_time, + "success_rate": metrics.success_rate, + "errors_by_type": metrics.errors_by_type, + "last_error": metrics.last_error, + "last_error_time": ( + metrics.last_error_time.isoformat() + if metrics.last_error_time + else None + ), + "compression_failures": metrics.compression_failures, + "hardware_accel_failures": metrics.hardware_accel_failures, + }, + } + + # Ensure directory exists + os.makedirs(os.path.dirname(self.persistence_path), exist_ok=True) + + # Write to temp file first + temp_path = f"{self.persistence_path}.tmp" + with open(temp_path, "w") as f: + json.dump(state, f, default=str) + f.flush() + os.fsync(f.fileno()) + + # Atomic rename + os.rename(temp_path, self.persistence_path) + + except Exception as e: + logger.error(f"Error persisting queue state: {str(e)}") + raise QueueError(f"Failed to persist queue state: {str(e)}") + + def load_queue_state(self) -> Optional[Dict[str, Any]]: + """Load persisted queue state from disk + + Returns: + Dict containing queue state if successful, None if file doesn't exist + + Raises: + QueueError: If loading fails + """ + if not self.persistence_path or not os.path.exists(self.persistence_path): + return None + + try: + with open(self.persistence_path, "r") as f: + state = json.load(f) + + # Helper function to safely convert items + def safe_convert_item(item_data: dict) -> Optional[QueueItem]: + try: + if isinstance(item_data, dict): + # Ensure datetime fields are properly formatted + if 'added_at' in item_data and item_data['added_at']: + if isinstance(item_data['added_at'], str): + try: + item_data['added_at'] = datetime.fromisoformat(item_data['added_at']) + except ValueError: + item_data['added_at'] = datetime.utcnow() + elif not isinstance(item_data['added_at'], datetime): + item_data['added_at'] = datetime.utcnow() + + if 'last_retry' in item_data and item_data['last_retry']: + if isinstance(item_data['last_retry'], str): + try: + item_data['last_retry'] = datetime.fromisoformat(item_data['last_retry']) + except ValueError: + item_data['last_retry'] = None + elif not isinstance(item_data['last_retry'], datetime): + item_data['last_retry'] = None + + if 'last_error_time' in item_data and item_data['last_error_time']: + if isinstance(item_data['last_error_time'], str): + try: + item_data['last_error_time'] = datetime.fromisoformat(item_data['last_error_time']) + except ValueError: + item_data['last_error_time'] = None + elif not isinstance(item_data['last_error_time'], datetime): + item_data['last_error_time'] = None + + # Ensure processing_time is a float + if 'processing_time' in item_data: + try: + if isinstance(item_data['processing_time'], str): + item_data['processing_time'] = float(item_data['processing_time']) + elif not isinstance(item_data['processing_time'], (int, float)): + item_data['processing_time'] = 0.0 + except (ValueError, TypeError): + item_data['processing_time'] = 0.0 + + return QueueItem(**item_data) + return None + except Exception as e: + logger.error(f"Error converting queue item: {e}") + return None + + # Convert queue items + queue = [] + for item in state.get("queue", []): + converted_item = safe_convert_item(item) + if converted_item: + queue.append(converted_item) + state["queue"] = queue + + # Convert processing items + processing = {} + for k, v in state.get("processing", {}).items(): + converted_item = safe_convert_item(v) + if converted_item: + processing[k] = converted_item + state["processing"] = processing + + # Convert completed items + completed = {} + for k, v in state.get("completed", {}).items(): + converted_item = safe_convert_item(v) + if converted_item: + completed[k] = converted_item + state["completed"] = completed + + # Convert failed items + failed = {} + for k, v in state.get("failed", {}).items(): + converted_item = safe_convert_item(v) + if converted_item: + failed[k] = converted_item + state["failed"] = failed + + logger.info("Successfully loaded persisted queue state") + return state + + except Exception as e: + logger.error(f"Error loading persisted queue state: {str(e)}") + # Create backup of corrupted state file + if os.path.exists(self.persistence_path): + backup_path = f"{self.persistence_path}.bak.{int(time.time())}" + try: + os.rename(self.persistence_path, backup_path) + logger.info(f"Created backup of corrupted state file: {backup_path}") + except Exception as be: + logger.error(f"Failed to create backup of corrupted state file: {str(be)}") + raise QueueError(f"Failed to load queue state: {str(e)}") + +class QueueError(Exception): + """Base exception for queue-related errors""" + pass diff --git a/videoarchiver/video_archiver.py b/videoarchiver/video_archiver.py index 62b009d..4481d62 100644 --- a/videoarchiver/video_archiver.py +++ b/videoarchiver/video_archiver.py @@ -17,7 +17,7 @@ from videoarchiver.processor import VideoProcessor from videoarchiver.utils.video_downloader import VideoDownloader from videoarchiver.utils.message_manager import MessageManager from videoarchiver.utils.file_ops import cleanup_downloads -from videoarchiver.enhanced_queue import EnhancedVideoQueueManager +from videoarchiver.queue import EnhancedVideoQueueManager # Updated import from videoarchiver.ffmpeg.ffmpeg_manager import FFmpegManager from videoarchiver.utils.exceptions import ( VideoArchiverError as ProcessingError,