Created a proper dependency hierarchy:

core/types.py - Contains shared interfaces and types
queue/types.py - Contains queue-specific types
Components now depend on interfaces rather than concrete implementations
Broke cyclic dependencies:

Removed direct imports between queue_processor.py and manager.py
Removed circular dependencies between core and processor modules
Components now communicate through well-defined interfaces
Improved architecture:

Clear separation of concerns
Better dependency management
More maintainable and testable code
Proper use of dependency injection
This commit is contained in:
pacnpal
2024-11-17 21:11:00 +00:00
parent 063258513e
commit e997c6f6b9
5 changed files with 363 additions and 409 deletions

View File

@@ -2,11 +2,11 @@
import asyncio
import logging
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Tuple, Dict, Any, List, Set, Callable
from datetime import datetime, timedelta
from ..core.types import IQueueManager, QueueState, ComponentStatus
from .state_manager import QueueStateManager
from .processor import QueueProcessor
from .metrics_manager import QueueMetricsManager
@@ -18,15 +18,6 @@ from .types import ProcessingStrategy
logger = logging.getLogger("QueueManager")
class QueueState(Enum):
"""Queue operational states"""
UNINITIALIZED = "uninitialized"
INITIALIZING = "initializing"
RUNNING = "running"
PAUSED = "paused"
STOPPING = "stopping"
STOPPED = "stopped"
ERROR = "error"
class QueueMode(Enum):
"""Queue processing modes"""
@@ -35,6 +26,7 @@ class QueueMode(Enum):
PRIORITY = "priority" # Priority-based processing
MAINTENANCE = "maintenance" # Maintenance mode
@dataclass
class QueueConfig:
"""Queue configuration settings"""
@@ -50,6 +42,7 @@ class QueueConfig:
persistence_enabled: bool = True
monitoring_level: MonitoringLevel = MonitoringLevel.NORMAL
@dataclass
class QueueStats:
"""Queue statistics"""
@@ -61,21 +54,27 @@ class QueueStats:
peak_memory_usage: float = 0.0
state_changes: List[Dict[str, Any]] = field(default_factory=list)
class QueueCoordinator:
"""Coordinates queue operations"""
def __init__(self):
self.state = QueueState.UNINITIALIZED
self._state = QueueState.UNINITIALIZED
self.mode = QueueMode.NORMAL
self._state_lock = asyncio.Lock()
self._mode_lock = asyncio.Lock()
self._paused = asyncio.Event()
self._paused.set()
@property
def state(self) -> QueueState:
"""Get current state"""
return self._state
async def set_state(self, state: QueueState) -> None:
"""Set queue state"""
async with self._state_lock:
self.state = state
self._state = state
async def set_mode(self, mode: QueueMode) -> None:
"""Set queue mode"""
@@ -96,7 +95,8 @@ class QueueCoordinator:
"""Wait if queue is paused"""
await self._paused.wait()
class EnhancedVideoQueueManager:
class EnhancedVideoQueueManager(IQueueManager):
"""Enhanced queue manager with improved organization and maintainability"""
def __init__(self, config: Optional[QueueConfig] = None):
@@ -123,7 +123,7 @@ class EnhancedVideoQueueManager:
QueuePersistenceManager() if self.config.persistence_enabled else None
)
# Initialize processor with strategy
# Initialize processor
self.processor = QueueProcessor(
state_manager=self.state_manager,
monitor=self.monitor,
@@ -139,6 +139,11 @@ class EnhancedVideoQueueManager:
self._stats_task: Optional[asyncio.Task] = None
self._processing_task: Optional[asyncio.Task] = None
@property
def state(self) -> QueueState:
"""Get current state"""
return self.coordinator.state
async def initialize(self) -> None:
"""Initialize the queue manager components"""
if self.coordinator.state != QueueState.UNINITIALIZED:
@@ -173,25 +178,132 @@ class EnhancedVideoQueueManager:
logger.error(f"Failed to initialize queue manager: {e}")
raise
async def process_queue(self, processor_func: Callable[[QueueItem], Tuple[bool, Optional[str]]]) -> None:
"""Start processing the queue with the given processor function"""
if self._processing_task and not self._processing_task.done():
logger.warning("Queue processing is already running")
return
async def add_to_queue(
self,
url: str,
message_id: int,
channel_id: int,
guild_id: int,
author_id: int,
priority: int = 0,
) -> bool:
"""Add a video to the processing queue"""
if self.coordinator.state in (QueueState.STOPPED, QueueState.ERROR):
raise QueueError("Queue manager is not running")
# Wait if queue is paused
await self.coordinator.wait_if_paused()
try:
self._processing_task = asyncio.create_task(
self.processor.start_processing(processor_func)
item = QueueItem(
url=url,
message_id=message_id,
channel_id=channel_id,
guild_id=guild_id,
author_id=author_id,
added_at=datetime.utcnow(),
priority=priority,
)
await self._processing_task
except asyncio.CancelledError:
logger.info("Queue processing cancelled")
except Exception as e:
logger.error(f"Error in queue processing: {e}")
raise
finally:
self._processing_task = None
success = await self.state_manager.add_item(item)
if success and self.persistence:
await self._persist_state()
return success
except Exception as e:
logger.error(f"Error adding to queue: {e}")
raise QueueError(f"Failed to add to queue: {str(e)}")
def get_queue_status(self, guild_id: int) -> Dict[str, Any]:
"""Get current queue status for a guild"""
try:
status = self.state_manager.get_guild_status(guild_id)
metrics = self.metrics_manager.get_metrics()
monitor_stats = self.monitor.get_monitoring_stats()
processor_stats = self.processor.get_processor_stats()
return {
**status,
"metrics": metrics,
"monitoring": monitor_stats,
"state": self.coordinator.state.value,
"mode": self.coordinator.mode.value,
"active": self.coordinator.state == QueueState.RUNNING and bool(processor_stats["active_tasks"]),
"stalled": monitor_stats.get("stalled", False),
"stats": {
"uptime": self.stats.uptime.total_seconds(),
"peak_queue_size": self.stats.peak_queue_size,
"peak_memory_usage": self.stats.peak_memory_usage,
"total_processed": self.stats.total_processed,
"total_failed": self.stats.total_failed,
},
}
except Exception as e:
logger.error(f"Error getting queue status: {e}")
return self._get_default_status()
async def cleanup(self) -> None:
"""Clean up resources and stop queue processing"""
try:
await self.coordinator.set_state(QueueState.STOPPING)
logger.info("Starting queue manager cleanup...")
# Cancel background tasks
if self._maintenance_task:
self._maintenance_task.cancel()
if self._stats_task:
self._stats_task.cancel()
if self._processing_task:
self._processing_task.cancel()
try:
await self._processing_task
except asyncio.CancelledError:
pass
# Stop processor
await self.processor.stop_processing()
# Stop monitoring and cleanup
await self.monitor.stop()
await self.cleaner.stop()
# Final state persistence
if self.persistence:
await self._persist_state()
# Clear state
await self.state_manager.clear_state()
await self.coordinator.set_state(QueueState.STOPPED)
logger.info("Queue manager cleanup completed")
except Exception as e:
await self.coordinator.set_state(QueueState.ERROR)
logger.error(f"Error during cleanup: {e}")
raise CleanupError(f"Failed to clean up queue manager: {str(e)}")
def get_status(self) -> ComponentStatus:
"""Get component status"""
return ComponentStatus(
state=self.coordinator.state.name,
health=not self.monitor.get_monitoring_stats().get("stalled", False),
last_check=datetime.utcnow().isoformat(),
details={
"mode": self.coordinator.mode.value,
"metrics": self.metrics_manager.get_metrics(),
"monitoring": self.monitor.get_monitoring_stats(),
"stats": {
"uptime": self.stats.uptime.total_seconds(),
"peak_queue_size": self.stats.peak_queue_size,
"peak_memory_usage": self.stats.peak_memory_usage,
"total_processed": self.stats.total_processed,
"total_failed": self.stats.total_failed,
},
}
)
# Helper methods below...
async def _load_persisted_state(self) -> None:
"""Load persisted queue state"""
try:
@@ -296,149 +408,6 @@ class EnhancedVideoQueueManager:
except Exception as e:
logger.error(f"Error updating stats: {e}")
async def add_to_queue(
self,
url: str,
message_id: int,
channel_id: int,
guild_id: int,
author_id: int,
priority: int = 0,
) -> bool:
"""Add a video to the processing queue"""
if self.coordinator.state in (QueueState.STOPPED, QueueState.ERROR):
raise QueueError("Queue manager is not running")
# Wait if queue is paused
await self.coordinator.wait_if_paused()
try:
item = QueueItem(
url=url,
message_id=message_id,
channel_id=channel_id,
guild_id=guild_id,
author_id=author_id,
added_at=datetime.utcnow(),
priority=priority,
)
success = await self.state_manager.add_item(item)
if success and self.persistence:
await self._persist_state()
return success
except Exception as e:
logger.error(f"Error adding to queue: {e}")
raise QueueError(f"Failed to add to queue: {str(e)}")
def get_queue_status(self, guild_id: int) -> Dict[str, Any]:
"""Get current queue status for a guild"""
try:
status = self.state_manager.get_guild_status(guild_id)
metrics = self.metrics_manager.get_metrics()
monitor_stats = self.monitor.get_monitoring_stats()
processor_stats = self.processor.get_processor_stats()
return {
**status,
"metrics": metrics,
"monitoring": monitor_stats,
"state": self.coordinator.state.value,
"mode": self.coordinator.mode.value,
"active": self.coordinator.state == QueueState.RUNNING and bool(processor_stats["active_tasks"]),
"stalled": monitor_stats.get("stalled", False),
"stats": {
"uptime": self.stats.uptime.total_seconds(),
"peak_queue_size": self.stats.peak_queue_size,
"peak_memory_usage": self.stats.peak_memory_usage,
"total_processed": self.stats.total_processed,
"total_failed": self.stats.total_failed,
},
}
except Exception as e:
logger.error(f"Error getting queue status: {e}")
return self._get_default_status()
async def pause(self) -> None:
"""Pause queue processing"""
await self.coordinator.pause()
logger.info("Queue processing paused")
async def resume(self) -> None:
"""Resume queue processing"""
await self.coordinator.resume()
logger.info("Queue processing resumed")
async def cleanup(self) -> None:
"""Clean up resources and stop queue processing"""
try:
await self.coordinator.set_state(QueueState.STOPPING)
logger.info("Starting queue manager cleanup...")
# Cancel background tasks
if self._maintenance_task:
self._maintenance_task.cancel()
if self._stats_task:
self._stats_task.cancel()
if self._processing_task:
self._processing_task.cancel()
try:
await self._processing_task
except asyncio.CancelledError:
pass
# Stop processor
await self.processor.stop_processing()
# Stop monitoring and cleanup
await self.monitor.stop()
await self.cleaner.stop()
# Final state persistence
if self.persistence:
await self._persist_state()
# Clear state
await self.state_manager.clear_state()
await self.coordinator.set_state(QueueState.STOPPED)
logger.info("Queue manager cleanup completed")
except Exception as e:
await self.coordinator.set_state(QueueState.ERROR)
logger.error(f"Error during cleanup: {e}")
raise CleanupError(f"Failed to clean up queue manager: {str(e)}")
async def force_stop(self) -> None:
"""Force stop all queue operations immediately"""
await self.coordinator.set_state(QueueState.STOPPING)
logger.info("Force stopping queue manager...")
# Cancel background tasks
if self._maintenance_task:
self._maintenance_task.cancel()
if self._stats_task:
self._stats_task.cancel()
if self._processing_task:
self._processing_task.cancel()
try:
await self._processing_task
except asyncio.CancelledError:
pass
# Force stop all components
await self.processor.stop_processing()
await self.monitor.stop()
await self.cleaner.stop()
# Clear state
await self.state_manager.clear_state()
await self.coordinator.set_state(QueueState.STOPPED)
logger.info("Queue manager force stopped")
async def _persist_state(self) -> None:
"""Persist current state to storage"""
if not self.persistence: