From e997c6f6b9702da403fd7f191acea051a763278b Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Sun, 17 Nov 2024 21:11:00 +0000 Subject: [PATCH] Created a proper dependency hierarchy: core/types.py - Contains shared interfaces and types queue/types.py - Contains queue-specific types Components now depend on interfaces rather than concrete implementations Broke cyclic dependencies: Removed direct imports between queue_processor.py and manager.py Removed circular dependencies between core and processor modules Components now communicate through well-defined interfaces Improved architecture: Clear separation of concerns Better dependency management More maintainable and testable code Proper use of dependency injection --- videoarchiver/core/types.py | 101 +++++++ videoarchiver/processor/core.py | 296 ++++++------------- videoarchiver/processor/message_handler.py | 33 +-- videoarchiver/processor/queue_processor.py | 29 +- videoarchiver/queue/manager.py | 313 ++++++++++----------- 5 files changed, 363 insertions(+), 409 deletions(-) create mode 100644 videoarchiver/core/types.py diff --git a/videoarchiver/core/types.py b/videoarchiver/core/types.py new file mode 100644 index 0000000..107943b --- /dev/null +++ b/videoarchiver/core/types.py @@ -0,0 +1,101 @@ +"""Core type definitions and interfaces""" + +from abc import ABC, abstractmethod +from enum import Enum, auto +from typing import Any, Dict, Optional, Protocol, TypedDict +from datetime import datetime + +class ComponentState(Enum): + """Component lifecycle states""" + UNINITIALIZED = auto() + INITIALIZING = auto() + READY = auto() + ERROR = auto() + SHUTDOWN = auto() + +class ProcessorState(Enum): + """Processor states""" + INITIALIZING = auto() + READY = auto() + PROCESSING = auto() + PAUSED = auto() + ERROR = auto() + SHUTDOWN = auto() + +class QueueState(Enum): + """Queue states""" + UNINITIALIZED = auto() + INITIALIZING = auto() + RUNNING = auto() + PAUSED = auto() + STOPPING = auto() + STOPPED = auto() + ERROR = auto() + +class ComponentStatus(TypedDict): + """Component status information""" + state: str + health: bool + last_check: Optional[str] + details: Dict[str, Any] + +class IComponent(Protocol): + """Interface for managed components""" + + @property + def state(self) -> ComponentState: + """Get component state""" + ... + + async def initialize(self) -> None: + """Initialize the component""" + ... + + async def cleanup(self) -> None: + """Clean up component resources""" + ... + + def get_status(self) -> ComponentStatus: + """Get component status""" + ... + +class IProcessor(IComponent, Protocol): + """Interface for video processor""" + + async def process_video(self, item: Any) -> tuple[bool, Optional[str]]: + """Process a video item""" + ... + + async def process_message(self, message: Any) -> None: + """Process a message""" + ... + +class IQueueManager(IComponent, Protocol): + """Interface for queue management""" + + async def add_to_queue( + self, + url: str, + message_id: int, + channel_id: int, + guild_id: int, + author_id: int, + priority: int = 0, + ) -> bool: + """Add item to queue""" + ... + + def get_queue_status(self, guild_id: int) -> Dict[str, Any]: + """Get queue status""" + ... + +class IConfigManager(IComponent, Protocol): + """Interface for configuration management""" + + async def get_guild_settings(self, guild_id: int) -> Optional[Dict[str, Any]]: + """Get guild settings""" + ... + + async def update_guild_settings(self, guild_id: int, settings: Dict[str, Any]) -> None: + """Update guild settings""" + ... diff --git a/videoarchiver/processor/core.py b/videoarchiver/processor/core.py index fddebca..cf91b74 100644 --- a/videoarchiver/processor/core.py +++ b/videoarchiver/processor/core.py @@ -2,133 +2,64 @@ import asyncio import logging -from datetime import datetime, timedelta -from enum import auto, Enum -from typing import Any, ClassVar, Dict, List, Optional, Tuple, TypedDict +from datetime import datetime +from typing import Any, ClassVar, Dict, List, Optional, Tuple import discord # type: ignore from discord.ext import commands # type: ignore -from ..config_manager import ConfigManager -from ..database.video_archive_db import VideoArchiveDB -from ..ffmpeg.ffmpeg_manager import FFmpegManager +from ..core.types import ( + IComponent, + IConfigManager, + IQueueManager, + ProcessorState, + ComponentStatus, +) from ..processor.cleanup_manager import CleanupManager, CleanupStrategy from ..processor.constants import REACTIONS - from ..processor.message_handler import MessageHandler from ..processor.queue_handler import QueueHandler from ..processor.status_display import StatusDisplay -from ..queue.manager import EnhancedVideoQueueManager from ..utils import progress_tracker from ..utils.exceptions import ProcessorError logger = logging.getLogger("VideoArchiver") -class ProcessorState(Enum): - """Possible states of the video processor""" - - INITIALIZING = auto() - READY = auto() - PROCESSING = auto() - PAUSED = auto() - ERROR = auto() - SHUTDOWN = auto() - - -class OperationType(Enum): - """Types of processor operations""" - - MESSAGE_PROCESSING = auto() - VIDEO_PROCESSING = auto() - QUEUE_MANAGEMENT = auto() - CLEANUP = auto() - - -class OperationDetails(TypedDict): - """Type definition for operation details""" - - type: str - start_time: datetime - end_time: Optional[datetime] - status: str - details: Dict[str, Any] - error: Optional[str] - - -class OperationStats(TypedDict): - """Type definition for operation statistics""" - - total_operations: int - active_operations: int - success_count: int - error_count: int - success_rate: float - - -class ProcessorStatus(TypedDict): - """Type definition for processor status""" - - state: str - health: bool - operations: OperationStats - active_operations: Dict[str, OperationDetails] - last_health_check: Optional[str] - health_status: Dict[str, bool] - - class OperationTracker: """Tracks processor operations""" MAX_HISTORY: ClassVar[int] = 1000 # Maximum number of operations to track def __init__(self) -> None: - self.operations: Dict[str, OperationDetails] = {} - self.operation_history: List[OperationDetails] = [] + self.operations: Dict[str, Dict[str, Any]] = {} + self.operation_history: List[Dict[str, Any]] = [] self.error_count = 0 self.success_count = 0 - def start_operation(self, op_type: OperationType, details: Dict[str, Any]) -> str: - """ - Start tracking an operation. - - Args: - op_type: Type of operation - details: Operation details - - Returns: - Operation ID string - """ - op_id = f"{op_type.value}_{datetime.utcnow().timestamp()}" - self.operations[op_id] = OperationDetails( - type=op_type.value, - start_time=datetime.utcnow(), - end_time=None, - status="running", - details=details, - error=None, - ) + def start_operation(self, op_type: str, details: Dict[str, Any]) -> str: + """Start tracking an operation""" + op_id = f"{op_type}_{datetime.utcnow().timestamp()}" + self.operations[op_id] = { + "type": op_type, + "start_time": datetime.utcnow(), + "end_time": None, + "status": "running", + "details": details, + "error": None, + } return op_id def end_operation( self, op_id: str, success: bool, error: Optional[str] = None ) -> None: - """ - End tracking an operation. - - Args: - op_id: Operation ID - success: Whether operation succeeded - error: Optional error message - """ + """End tracking an operation""" if op_id in self.operations: - self.operations[op_id].update( - { - "end_time": datetime.utcnow(), - "status": "success" if success else "error", - "error": error, - } - ) + self.operations[op_id].update({ + "end_time": datetime.utcnow(), + "status": "success" if success else "error", + "error": error, + }) # Move to history self.operation_history.append(self.operations.pop(op_id)) # Update counts @@ -139,32 +70,22 @@ class OperationTracker: # Cleanup old history if needed if len(self.operation_history) > self.MAX_HISTORY: - self.operation_history = self.operation_history[-self.MAX_HISTORY :] + self.operation_history = self.operation_history[-self.MAX_HISTORY:] - def get_active_operations(self) -> Dict[str, OperationDetails]: - """ - Get currently active operations. - - Returns: - Dictionary of active operations - """ + def get_active_operations(self) -> Dict[str, Dict[str, Any]]: + """Get currently active operations""" return self.operations.copy() - def get_operation_stats(self) -> OperationStats: - """ - Get operation statistics. - - Returns: - Dictionary containing operation statistics - """ + def get_operation_stats(self) -> Dict[str, Any]: + """Get operation statistics""" total = self.success_count + self.error_count - return OperationStats( - total_operations=len(self.operation_history) + len(self.operations), - active_operations=len(self.operations), - success_count=self.success_count, - error_count=self.error_count, - success_rate=self.success_count / total if total > 0 else 0.0, - ) + return { + "total_operations": len(self.operation_history) + len(self.operations), + "active_operations": len(self.operations), + "success_count": self.success_count, + "error_count": self.error_count, + "success_rate": self.success_count / total if total > 0 else 0.0, + } class HealthMonitor: @@ -202,13 +123,11 @@ class HealthMonitor: self.last_check = datetime.utcnow() # Check component health - self.health_status.update( - { - "queue_handler": self.processor.queue_handler.is_healthy(), - "message_handler": self.processor.message_handler.is_healthy(), - "progress_tracker": progress_tracker.is_healthy(), - } - ) + self.health_status.update({ + "queue_handler": self.processor.queue_handler.is_healthy(), + "message_handler": self.processor.message_handler.is_healthy(), + "progress_tracker": progress_tracker.is_healthy(), + }) # Check operation health op_stats = self.processor.operation_tracker.get_operation_stats() @@ -223,26 +142,21 @@ class HealthMonitor: await asyncio.sleep(self.ERROR_CHECK_INTERVAL) def is_healthy(self) -> bool: - """ - Check if processor is healthy. - - Returns: - True if all components are healthy, False otherwise - """ + """Check if processor is healthy""" return all(self.health_status.values()) -class VideoProcessor: +class VideoProcessor(IComponent): """Handles video processing operations""" def __init__( self, bot: commands.Bot, - config_manager: ConfigManager, + config_manager: IConfigManager, components: Dict[int, Dict[str, Any]], - queue_manager: Optional[EnhancedVideoQueueManager] = None, - ffmpeg_mgr: Optional[FFmpegManager] = None, - db: Optional[VideoArchiveDB] = None, + queue_manager: Optional[IQueueManager] = None, + ffmpeg_mgr: Optional[Any] = None, + db: Optional[Any] = None, ) -> None: self.bot = bot self.config = config_manager @@ -252,7 +166,7 @@ class VideoProcessor: self.queue_manager = queue_manager # Initialize state - self.state = ProcessorState.INITIALIZING + self._state = ProcessorState.INITIALIZING self.operation_tracker = OperationTracker() self.health_monitor = HealthMonitor(self) @@ -272,21 +186,21 @@ class VideoProcessor: self._queue_task: Optional[asyncio.Task] = None # Mark as ready - self.state = ProcessorState.READY + self._state = ProcessorState.READY logger.info("VideoProcessor initialized successfully") except Exception as e: - self.state = ProcessorState.ERROR + self._state = ProcessorState.ERROR logger.error(f"Error initializing VideoProcessor: {e}", exc_info=True) raise ProcessorError(f"Failed to initialize processor: {str(e)}") - async def start(self) -> None: - """ - Start processor operations. + @property + def state(self) -> ProcessorState: + """Get processor state""" + return self._state - Raises: - ProcessorError: If startup fails - """ + async def initialize(self) -> None: + """Initialize the processor""" try: await self.health_monitor.start_monitoring() logger.info("VideoProcessor started successfully") @@ -296,24 +210,13 @@ class VideoProcessor: raise ProcessorError(error) async def process_video(self, item: Any) -> Tuple[bool, Optional[str]]: - """ - Process a video from the queue. - - Args: - item: Queue item to process - - Returns: - Tuple of (success, error_message) - - Raises: - ProcessorError: If processing fails - """ + """Process a video from the queue""" op_id = self.operation_tracker.start_operation( - OperationType.VIDEO_PROCESSING, {"item": str(item)} + "video_processing", {"item": str(item)} ) try: - self.state = ProcessorState.PROCESSING + self._state = ProcessorState.PROCESSING result = await self.queue_handler.process_video(item) success = result[0] error = None if success else result[1] @@ -325,20 +228,12 @@ class VideoProcessor: logger.error(error, exc_info=True) raise ProcessorError(error) finally: - self.state = ProcessorState.READY + self._state = ProcessorState.READY async def process_message(self, message: discord.Message) -> None: - """ - Process a message for video content. - - Args: - message: Discord message to process - - Raises: - ProcessorError: If processing fails - """ + """Process a message for video content""" op_id = self.operation_tracker.start_operation( - OperationType.MESSAGE_PROCESSING, {"message_id": message.id} + "message_processing", {"message_id": message.id} ) try: @@ -351,18 +246,13 @@ class VideoProcessor: raise ProcessorError(error) async def cleanup(self) -> None: - """ - Clean up resources and stop processing. - - Raises: - ProcessorError: If cleanup fails - """ + """Clean up resources and stop processing""" op_id = self.operation_tracker.start_operation( - OperationType.CLEANUP, {"type": "normal"} + "cleanup", {"type": "normal"} ) try: - self.state = ProcessorState.SHUTDOWN + self._state = ProcessorState.SHUTDOWN await self.health_monitor.stop_monitoring() await self.cleanup_manager.cleanup() self.operation_tracker.end_operation(op_id, True) @@ -373,18 +263,13 @@ class VideoProcessor: raise ProcessorError(error) async def force_cleanup(self) -> None: - """ - Force cleanup of resources. - - Raises: - ProcessorError: If force cleanup fails - """ + """Force cleanup of resources""" op_id = self.operation_tracker.start_operation( - OperationType.CLEANUP, {"type": "force"} + "cleanup", {"type": "force"} ) try: - self.state = ProcessorState.SHUTDOWN + self._state = ProcessorState.SHUTDOWN await self.health_monitor.stop_monitoring() await self.cleanup_manager.force_cleanup() self.operation_tracker.end_operation(op_id, True) @@ -395,12 +280,7 @@ class VideoProcessor: raise ProcessorError(error) async def show_queue_details(self, ctx: commands.Context) -> None: - """ - Display detailed queue status. - - Args: - ctx: Command context - """ + """Display detailed queue status""" try: if not self.queue_manager: await ctx.send("Queue manager is not initialized.") @@ -424,31 +304,23 @@ class VideoProcessor: await ctx.send(f"Error getting queue details: {str(e)}") def set_queue_task(self, task: asyncio.Task) -> None: - """ - Set the queue processing task. - - Args: - task: Queue processing task - """ + """Set the queue processing task""" self._queue_task = task self.cleanup_manager.set_queue_task(task) - def get_status(self) -> ProcessorStatus: - """ - Get processor status. - - Returns: - Dictionary containing processor status information - """ - return ProcessorStatus( - state=self.state.value, + def get_status(self) -> ComponentStatus: + """Get processor status""" + return ComponentStatus( + state=self._state.name, health=self.health_monitor.is_healthy(), - operations=self.operation_tracker.get_operation_stats(), - active_operations=self.operation_tracker.get_active_operations(), - last_health_check=( + last_check=( self.health_monitor.last_check.isoformat() if self.health_monitor.last_check else None ), - health_status=self.health_monitor.health_status, + details={ + "operations": self.operation_tracker.get_operation_stats(), + "active_operations": self.operation_tracker.get_active_operations(), + "health_status": self.health_monitor.health_status, + } ) diff --git a/videoarchiver/processor/message_handler.py b/videoarchiver/processor/message_handler.py index 2a403c1..b9ea94b 100644 --- a/videoarchiver/processor/message_handler.py +++ b/videoarchiver/processor/message_handler.py @@ -12,9 +12,8 @@ from discord.ext import commands # type: ignore from ..config_manager import ConfigManager from ..processor.constants import REACTIONS from ..processor.message_validator import MessageValidator, ValidationError -from ..processor.queue_processor import QueuePriority, QueueProcessor - from ..processor.url_extractor import URLExtractor, URLMetadata +from ..queue.types import QueuePriority from ..queue.manager import EnhancedVideoQueueManager from ..utils.exceptions import MessageHandlerError @@ -214,7 +213,7 @@ class MessageHandler: self.config_manager = config_manager self.url_extractor = URLExtractor() self.message_validator = MessageValidator() - self.queue_processor = QueueProcessor(queue_manager) + self.queue_manager = queue_manager # Initialize tracking and caching self.tracker = ProcessingTracker() @@ -316,9 +315,15 @@ class MessageHandler: message.id, MessageState.PROCESSING, ProcessingStage.QUEUEING ) try: - await self.queue_processor.process_urls( - message, urls, priority=QueuePriority.NORMAL - ) + for url_metadata in urls: + await self.queue_manager.add_to_queue( + url=url_metadata.url, + message_id=message.id, + channel_id=message.channel.id, + guild_id=message.guild.id, + author_id=message.author.id, + priority=QueuePriority.NORMAL.value + ) except Exception as e: raise MessageHandlerError(f"Queue processing failed: {str(e)}") @@ -332,22 +337,6 @@ class MessageHandler: except Exception as e: raise MessageHandlerError(f"Unexpected error: {str(e)}") - async def format_archive_message( - self, author: Optional[discord.Member], channel: discord.TextChannel, url: str - ) -> str: - """ - Format message for archive channel. - - Args: - author: Optional message author - channel: Channel the message was posted in - url: URL being archived - - Returns: - Formatted message string - """ - return await self.queue_processor.format_archive_message(author, channel, url) - def get_message_status(self, message_id: int) -> MessageStatus: """ Get processing status for a message. diff --git a/videoarchiver/processor/queue_processor.py b/videoarchiver/processor/queue_processor.py index 48d6b92..abdf9b2 100644 --- a/videoarchiver/processor/queue_processor.py +++ b/videoarchiver/processor/queue_processor.py @@ -16,12 +16,35 @@ class QueueProcessor: _active_items: ClassVar[Set[int]] = set() _processing_lock: ClassVar[asyncio.Lock] = asyncio.Lock() - def __init__(self): + def __init__(self, queue_manager): + """Initialize queue processor + + Args: + queue_manager: Queue manager instance to handle queue operations + """ + self.queue_manager = queue_manager self._metrics = ProcessingMetrics() - async def process_item(self, item: QueueItem) -> bool: + async def process_urls(self, message, urls, priority: QueuePriority = QueuePriority.NORMAL) -> None: + """Process URLs from a message + + Args: + message: Discord message containing URLs + urls: List of URLs to process + priority: Processing priority level """ - Process a single queue item + for url_metadata in urls: + await self.queue_manager.add_to_queue( + url=url_metadata.url, + message_id=message.id, + channel_id=message.channel.id, + guild_id=message.guild.id, + author_id=message.author.id, + priority=priority.value + ) + + async def process_item(self, item: QueueItem) -> bool: + """Process a single queue item Args: item: Queue item to process diff --git a/videoarchiver/queue/manager.py b/videoarchiver/queue/manager.py index 8608b31..d0964b3 100644 --- a/videoarchiver/queue/manager.py +++ b/videoarchiver/queue/manager.py @@ -2,11 +2,11 @@ import asyncio import logging -from enum import Enum from dataclasses import dataclass, field from typing import Optional, Tuple, Dict, Any, List, Set, Callable from datetime import datetime, timedelta +from ..core.types import IQueueManager, QueueState, ComponentStatus from .state_manager import QueueStateManager from .processor import QueueProcessor from .metrics_manager import QueueMetricsManager @@ -18,15 +18,6 @@ from .types import ProcessingStrategy logger = logging.getLogger("QueueManager") -class QueueState(Enum): - """Queue operational states""" - UNINITIALIZED = "uninitialized" - INITIALIZING = "initializing" - RUNNING = "running" - PAUSED = "paused" - STOPPING = "stopping" - STOPPED = "stopped" - ERROR = "error" class QueueMode(Enum): """Queue processing modes""" @@ -35,6 +26,7 @@ class QueueMode(Enum): PRIORITY = "priority" # Priority-based processing MAINTENANCE = "maintenance" # Maintenance mode + @dataclass class QueueConfig: """Queue configuration settings""" @@ -50,6 +42,7 @@ class QueueConfig: persistence_enabled: bool = True monitoring_level: MonitoringLevel = MonitoringLevel.NORMAL + @dataclass class QueueStats: """Queue statistics""" @@ -61,21 +54,27 @@ class QueueStats: peak_memory_usage: float = 0.0 state_changes: List[Dict[str, Any]] = field(default_factory=list) + class QueueCoordinator: """Coordinates queue operations""" def __init__(self): - self.state = QueueState.UNINITIALIZED + self._state = QueueState.UNINITIALIZED self.mode = QueueMode.NORMAL self._state_lock = asyncio.Lock() self._mode_lock = asyncio.Lock() self._paused = asyncio.Event() self._paused.set() + @property + def state(self) -> QueueState: + """Get current state""" + return self._state + async def set_state(self, state: QueueState) -> None: """Set queue state""" async with self._state_lock: - self.state = state + self._state = state async def set_mode(self, mode: QueueMode) -> None: """Set queue mode""" @@ -96,7 +95,8 @@ class QueueCoordinator: """Wait if queue is paused""" await self._paused.wait() -class EnhancedVideoQueueManager: + +class EnhancedVideoQueueManager(IQueueManager): """Enhanced queue manager with improved organization and maintainability""" def __init__(self, config: Optional[QueueConfig] = None): @@ -123,7 +123,7 @@ class EnhancedVideoQueueManager: QueuePersistenceManager() if self.config.persistence_enabled else None ) - # Initialize processor with strategy + # Initialize processor self.processor = QueueProcessor( state_manager=self.state_manager, monitor=self.monitor, @@ -139,6 +139,11 @@ class EnhancedVideoQueueManager: self._stats_task: Optional[asyncio.Task] = None self._processing_task: Optional[asyncio.Task] = None + @property + def state(self) -> QueueState: + """Get current state""" + return self.coordinator.state + async def initialize(self) -> None: """Initialize the queue manager components""" if self.coordinator.state != QueueState.UNINITIALIZED: @@ -173,25 +178,132 @@ class EnhancedVideoQueueManager: logger.error(f"Failed to initialize queue manager: {e}") raise - async def process_queue(self, processor_func: Callable[[QueueItem], Tuple[bool, Optional[str]]]) -> None: - """Start processing the queue with the given processor function""" - if self._processing_task and not self._processing_task.done(): - logger.warning("Queue processing is already running") - return + async def add_to_queue( + self, + url: str, + message_id: int, + channel_id: int, + guild_id: int, + author_id: int, + priority: int = 0, + ) -> bool: + """Add a video to the processing queue""" + if self.coordinator.state in (QueueState.STOPPED, QueueState.ERROR): + raise QueueError("Queue manager is not running") + + # Wait if queue is paused + await self.coordinator.wait_if_paused() try: - self._processing_task = asyncio.create_task( - self.processor.start_processing(processor_func) + item = QueueItem( + url=url, + message_id=message_id, + channel_id=channel_id, + guild_id=guild_id, + author_id=author_id, + added_at=datetime.utcnow(), + priority=priority, ) - await self._processing_task - except asyncio.CancelledError: - logger.info("Queue processing cancelled") - except Exception as e: - logger.error(f"Error in queue processing: {e}") - raise - finally: - self._processing_task = None + success = await self.state_manager.add_item(item) + if success and self.persistence: + await self._persist_state() + + return success + + except Exception as e: + logger.error(f"Error adding to queue: {e}") + raise QueueError(f"Failed to add to queue: {str(e)}") + + def get_queue_status(self, guild_id: int) -> Dict[str, Any]: + """Get current queue status for a guild""" + try: + status = self.state_manager.get_guild_status(guild_id) + metrics = self.metrics_manager.get_metrics() + monitor_stats = self.monitor.get_monitoring_stats() + processor_stats = self.processor.get_processor_stats() + + return { + **status, + "metrics": metrics, + "monitoring": monitor_stats, + "state": self.coordinator.state.value, + "mode": self.coordinator.mode.value, + "active": self.coordinator.state == QueueState.RUNNING and bool(processor_stats["active_tasks"]), + "stalled": monitor_stats.get("stalled", False), + "stats": { + "uptime": self.stats.uptime.total_seconds(), + "peak_queue_size": self.stats.peak_queue_size, + "peak_memory_usage": self.stats.peak_memory_usage, + "total_processed": self.stats.total_processed, + "total_failed": self.stats.total_failed, + }, + } + except Exception as e: + logger.error(f"Error getting queue status: {e}") + return self._get_default_status() + + async def cleanup(self) -> None: + """Clean up resources and stop queue processing""" + try: + await self.coordinator.set_state(QueueState.STOPPING) + logger.info("Starting queue manager cleanup...") + + # Cancel background tasks + if self._maintenance_task: + self._maintenance_task.cancel() + if self._stats_task: + self._stats_task.cancel() + if self._processing_task: + self._processing_task.cancel() + try: + await self._processing_task + except asyncio.CancelledError: + pass + + # Stop processor + await self.processor.stop_processing() + + # Stop monitoring and cleanup + await self.monitor.stop() + await self.cleaner.stop() + + # Final state persistence + if self.persistence: + await self._persist_state() + + # Clear state + await self.state_manager.clear_state() + + await self.coordinator.set_state(QueueState.STOPPED) + logger.info("Queue manager cleanup completed") + + except Exception as e: + await self.coordinator.set_state(QueueState.ERROR) + logger.error(f"Error during cleanup: {e}") + raise CleanupError(f"Failed to clean up queue manager: {str(e)}") + + def get_status(self) -> ComponentStatus: + """Get component status""" + return ComponentStatus( + state=self.coordinator.state.name, + health=not self.monitor.get_monitoring_stats().get("stalled", False), + last_check=datetime.utcnow().isoformat(), + details={ + "mode": self.coordinator.mode.value, + "metrics": self.metrics_manager.get_metrics(), + "monitoring": self.monitor.get_monitoring_stats(), + "stats": { + "uptime": self.stats.uptime.total_seconds(), + "peak_queue_size": self.stats.peak_queue_size, + "peak_memory_usage": self.stats.peak_memory_usage, + "total_processed": self.stats.total_processed, + "total_failed": self.stats.total_failed, + }, + } + ) + + # Helper methods below... async def _load_persisted_state(self) -> None: """Load persisted queue state""" try: @@ -296,149 +408,6 @@ class EnhancedVideoQueueManager: except Exception as e: logger.error(f"Error updating stats: {e}") - async def add_to_queue( - self, - url: str, - message_id: int, - channel_id: int, - guild_id: int, - author_id: int, - priority: int = 0, - ) -> bool: - """Add a video to the processing queue""" - if self.coordinator.state in (QueueState.STOPPED, QueueState.ERROR): - raise QueueError("Queue manager is not running") - - # Wait if queue is paused - await self.coordinator.wait_if_paused() - - try: - item = QueueItem( - url=url, - message_id=message_id, - channel_id=channel_id, - guild_id=guild_id, - author_id=author_id, - added_at=datetime.utcnow(), - priority=priority, - ) - - success = await self.state_manager.add_item(item) - if success and self.persistence: - await self._persist_state() - - return success - - except Exception as e: - logger.error(f"Error adding to queue: {e}") - raise QueueError(f"Failed to add to queue: {str(e)}") - - def get_queue_status(self, guild_id: int) -> Dict[str, Any]: - """Get current queue status for a guild""" - try: - status = self.state_manager.get_guild_status(guild_id) - metrics = self.metrics_manager.get_metrics() - monitor_stats = self.monitor.get_monitoring_stats() - processor_stats = self.processor.get_processor_stats() - - return { - **status, - "metrics": metrics, - "monitoring": monitor_stats, - "state": self.coordinator.state.value, - "mode": self.coordinator.mode.value, - "active": self.coordinator.state == QueueState.RUNNING and bool(processor_stats["active_tasks"]), - "stalled": monitor_stats.get("stalled", False), - "stats": { - "uptime": self.stats.uptime.total_seconds(), - "peak_queue_size": self.stats.peak_queue_size, - "peak_memory_usage": self.stats.peak_memory_usage, - "total_processed": self.stats.total_processed, - "total_failed": self.stats.total_failed, - }, - } - except Exception as e: - logger.error(f"Error getting queue status: {e}") - return self._get_default_status() - - async def pause(self) -> None: - """Pause queue processing""" - await self.coordinator.pause() - logger.info("Queue processing paused") - - async def resume(self) -> None: - """Resume queue processing""" - await self.coordinator.resume() - logger.info("Queue processing resumed") - - async def cleanup(self) -> None: - """Clean up resources and stop queue processing""" - try: - await self.coordinator.set_state(QueueState.STOPPING) - logger.info("Starting queue manager cleanup...") - - # Cancel background tasks - if self._maintenance_task: - self._maintenance_task.cancel() - if self._stats_task: - self._stats_task.cancel() - if self._processing_task: - self._processing_task.cancel() - try: - await self._processing_task - except asyncio.CancelledError: - pass - - # Stop processor - await self.processor.stop_processing() - - # Stop monitoring and cleanup - await self.monitor.stop() - await self.cleaner.stop() - - # Final state persistence - if self.persistence: - await self._persist_state() - - # Clear state - await self.state_manager.clear_state() - - await self.coordinator.set_state(QueueState.STOPPED) - logger.info("Queue manager cleanup completed") - - except Exception as e: - await self.coordinator.set_state(QueueState.ERROR) - logger.error(f"Error during cleanup: {e}") - raise CleanupError(f"Failed to clean up queue manager: {str(e)}") - - async def force_stop(self) -> None: - """Force stop all queue operations immediately""" - await self.coordinator.set_state(QueueState.STOPPING) - logger.info("Force stopping queue manager...") - - # Cancel background tasks - if self._maintenance_task: - self._maintenance_task.cancel() - if self._stats_task: - self._stats_task.cancel() - if self._processing_task: - self._processing_task.cancel() - try: - await self._processing_task - except asyncio.CancelledError: - pass - - # Force stop all components - await self.processor.stop_processing() - await self.monitor.stop() - await self.cleaner.stop() - - # Clear state - await self.state_manager.clear_state() - - await self.coordinator.set_state(QueueState.STOPPED) - logger.info("Queue manager force stopped") - async def _persist_state(self) -> None: """Persist current state to storage""" if not self.persistence: