mirror of
https://github.com/pacnpal/Pac-cogs.git
synced 2025-12-20 02:41:06 -05:00
Moving runtime imports to appropriate locations Using string literal type annotations Importing shared utilities through the utils package
462 lines
16 KiB
Python
462 lines
16 KiB
Python
"""Enhanced queue manager for video processing"""
|
|
|
|
import asyncio
|
|
from enum import Enum
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional, Tuple, Dict, Any, List, Set, Callable
|
|
from datetime import datetime, timedelta
|
|
|
|
from ..core.types import IQueueManager, QueueState, ComponentStatus
|
|
from .state_manager import QueueStateManager
|
|
from .processor import QueueProcessor
|
|
from .metrics_manager import QueueMetricsManager
|
|
from .persistence import QueuePersistenceManager
|
|
from .monitoring import QueueMonitor, MonitoringLevel
|
|
from .cleanup import QueueCleaner
|
|
from .models import QueueItem, QueueError, CleanupError
|
|
from .types import ProcessingStrategy
|
|
|
|
logger = logging.getLogger("QueueManager")
|
|
|
|
|
|
class QueueMode(Enum):
|
|
"""Queue processing modes"""
|
|
NORMAL = "normal" # Standard processing
|
|
BATCH = "batch" # Batch processing
|
|
PRIORITY = "priority" # Priority-based processing
|
|
MAINTENANCE = "maintenance" # Maintenance mode
|
|
|
|
|
|
@dataclass
|
|
class QueueConfig:
|
|
"""Queue configuration settings"""
|
|
max_retries: int = 3
|
|
retry_delay: int = 5
|
|
max_queue_size: int = 1000
|
|
cleanup_interval: int = 3600 # 1 hour
|
|
max_history_age: int = 86400 # 24 hours
|
|
deadlock_threshold: int = 300 # 5 minutes
|
|
check_interval: int = 60 # 1 minute
|
|
batch_size: int = 10
|
|
max_concurrent: int = 3
|
|
persistence_enabled: bool = True
|
|
monitoring_level: MonitoringLevel = MonitoringLevel.NORMAL
|
|
|
|
|
|
@dataclass
|
|
class QueueStats:
|
|
"""Queue statistics"""
|
|
start_time: datetime = field(default_factory=datetime.utcnow)
|
|
total_processed: int = 0
|
|
total_failed: int = 0
|
|
uptime: timedelta = field(default_factory=lambda: timedelta())
|
|
peak_queue_size: int = 0
|
|
peak_memory_usage: float = 0.0
|
|
state_changes: List[Dict[str, Any]] = field(default_factory=list)
|
|
|
|
|
|
class QueueCoordinator:
|
|
"""Coordinates queue operations"""
|
|
|
|
def __init__(self):
|
|
self._state = QueueState.UNINITIALIZED
|
|
self.mode = QueueMode.NORMAL
|
|
self._state_lock = asyncio.Lock()
|
|
self._mode_lock = asyncio.Lock()
|
|
self._paused = asyncio.Event()
|
|
self._paused.set()
|
|
|
|
@property
|
|
def state(self) -> QueueState:
|
|
"""Get current state"""
|
|
return self._state
|
|
|
|
async def set_state(self, state: QueueState) -> None:
|
|
"""Set queue state"""
|
|
async with self._state_lock:
|
|
self._state = state
|
|
|
|
async def set_mode(self, mode: QueueMode) -> None:
|
|
"""Set queue mode"""
|
|
async with self._mode_lock:
|
|
self.mode = mode
|
|
|
|
async def pause(self) -> None:
|
|
"""Pause queue processing"""
|
|
self._paused.clear()
|
|
await self.set_state(QueueState.PAUSED)
|
|
|
|
async def resume(self) -> None:
|
|
"""Resume queue processing"""
|
|
self._paused.set()
|
|
await self.set_state(QueueState.RUNNING)
|
|
|
|
async def wait_if_paused(self) -> None:
|
|
"""Wait if queue is paused"""
|
|
await self._paused.wait()
|
|
|
|
|
|
class EnhancedVideoQueueManager(IQueueManager):
|
|
"""Enhanced queue manager with improved organization and maintainability"""
|
|
|
|
def __init__(self, config: Optional[QueueConfig] = None):
|
|
"""Initialize queue manager components"""
|
|
self.config = config or QueueConfig()
|
|
self.coordinator = QueueCoordinator()
|
|
self.stats = QueueStats()
|
|
|
|
# Initialize managers
|
|
self.state_manager = QueueStateManager(self.config.max_queue_size)
|
|
self.metrics_manager = QueueMetricsManager()
|
|
self.monitor = QueueMonitor(
|
|
deadlock_threshold=self.config.deadlock_threshold,
|
|
max_retries=self.config.max_retries,
|
|
check_interval=self.config.check_interval,
|
|
)
|
|
self.cleaner = QueueCleaner(
|
|
cleanup_interval=self.config.cleanup_interval,
|
|
max_history_age=self.config.max_history_age,
|
|
)
|
|
|
|
# Initialize persistence if enabled
|
|
self.persistence = (
|
|
QueuePersistenceManager() if self.config.persistence_enabled else None
|
|
)
|
|
|
|
# Initialize processor
|
|
self.processor = QueueProcessor(
|
|
state_manager=self.state_manager,
|
|
monitor=self.monitor,
|
|
strategy=ProcessingStrategy.CONCURRENT,
|
|
max_retries=self.config.max_retries,
|
|
retry_delay=self.config.retry_delay,
|
|
batch_size=self.config.batch_size,
|
|
max_concurrent=self.config.max_concurrent,
|
|
)
|
|
|
|
# Background tasks
|
|
self._maintenance_task: Optional[asyncio.Task] = None
|
|
self._stats_task: Optional[asyncio.Task] = None
|
|
self._processing_task: Optional[asyncio.Task] = None
|
|
|
|
@property
|
|
def state(self) -> QueueState:
|
|
"""Get current state"""
|
|
return self.coordinator.state
|
|
|
|
async def initialize(self) -> None:
|
|
"""Initialize the queue manager components"""
|
|
if self.coordinator.state != QueueState.UNINITIALIZED:
|
|
logger.info("Queue manager already initialized")
|
|
return
|
|
|
|
try:
|
|
await self.coordinator.set_state(QueueState.INITIALIZING)
|
|
logger.info("Starting queue manager initialization...")
|
|
|
|
# Load persisted state if available
|
|
if self.persistence:
|
|
await self._load_persisted_state()
|
|
|
|
# Start monitoring with configured level
|
|
self.monitor.strategy.level = self.config.monitoring_level
|
|
await self.monitor.start(self.state_manager, self.metrics_manager)
|
|
|
|
# Start cleanup task
|
|
await self.cleaner.start(
|
|
state_manager=self.state_manager, metrics_manager=self.metrics_manager
|
|
)
|
|
|
|
# Start background tasks
|
|
self._start_background_tasks()
|
|
|
|
await self.coordinator.set_state(QueueState.RUNNING)
|
|
logger.info("Queue manager initialization completed")
|
|
|
|
except Exception as e:
|
|
await self.coordinator.set_state(QueueState.ERROR)
|
|
logger.error(f"Failed to initialize queue manager: {e}")
|
|
raise
|
|
|
|
async def add_to_queue(
|
|
self,
|
|
url: str,
|
|
message_id: int,
|
|
channel_id: int,
|
|
guild_id: int,
|
|
author_id: int,
|
|
priority: int = 0,
|
|
) -> bool:
|
|
"""Add a video to the processing queue"""
|
|
if self.coordinator.state in (QueueState.STOPPED, QueueState.ERROR):
|
|
raise QueueError("Queue manager is not running")
|
|
|
|
# Wait if queue is paused
|
|
await self.coordinator.wait_if_paused()
|
|
|
|
try:
|
|
item = QueueItem(
|
|
url=url,
|
|
message_id=message_id,
|
|
channel_id=channel_id,
|
|
guild_id=guild_id,
|
|
author_id=author_id,
|
|
added_at=datetime.utcnow(),
|
|
priority=priority,
|
|
)
|
|
|
|
success = await self.state_manager.add_item(item)
|
|
if success and self.persistence:
|
|
await self._persist_state()
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding to queue: {e}")
|
|
raise QueueError(f"Failed to add to queue: {str(e)}")
|
|
|
|
def get_queue_status(self, guild_id: int) -> Dict[str, Any]:
|
|
"""Get current queue status for a guild"""
|
|
try:
|
|
status = self.state_manager.get_guild_status(guild_id)
|
|
metrics = self.metrics_manager.get_metrics()
|
|
monitor_stats = self.monitor.get_monitoring_stats()
|
|
processor_stats = self.processor.get_processor_stats()
|
|
|
|
return {
|
|
**status,
|
|
"metrics": metrics,
|
|
"monitoring": monitor_stats,
|
|
"state": self.coordinator.state.value,
|
|
"mode": self.coordinator.mode.value,
|
|
"active": self.coordinator.state == QueueState.RUNNING and bool(processor_stats["active_tasks"]),
|
|
"stalled": monitor_stats.get("stalled", False),
|
|
"stats": {
|
|
"uptime": self.stats.uptime.total_seconds(),
|
|
"peak_queue_size": self.stats.peak_queue_size,
|
|
"peak_memory_usage": self.stats.peak_memory_usage,
|
|
"total_processed": self.stats.total_processed,
|
|
"total_failed": self.stats.total_failed,
|
|
},
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting queue status: {e}")
|
|
return self._get_default_status()
|
|
|
|
async def cleanup(self) -> None:
|
|
"""Clean up resources and stop queue processing"""
|
|
try:
|
|
await self.coordinator.set_state(QueueState.STOPPING)
|
|
logger.info("Starting queue manager cleanup...")
|
|
|
|
# Cancel background tasks
|
|
if self._maintenance_task:
|
|
self._maintenance_task.cancel()
|
|
if self._stats_task:
|
|
self._stats_task.cancel()
|
|
if self._processing_task:
|
|
self._processing_task.cancel()
|
|
try:
|
|
await self._processing_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Stop processor
|
|
await self.processor.stop_processing()
|
|
|
|
# Stop monitoring and cleanup
|
|
await self.monitor.stop()
|
|
await self.cleaner.stop()
|
|
|
|
# Final state persistence
|
|
if self.persistence:
|
|
await self._persist_state()
|
|
|
|
# Clear state
|
|
await self.state_manager.clear_state()
|
|
|
|
await self.coordinator.set_state(QueueState.STOPPED)
|
|
logger.info("Queue manager cleanup completed")
|
|
|
|
except Exception as e:
|
|
await self.coordinator.set_state(QueueState.ERROR)
|
|
logger.error(f"Error during cleanup: {e}")
|
|
raise CleanupError(f"Failed to clean up queue manager: {str(e)}")
|
|
|
|
def get_status(self) -> ComponentStatus:
|
|
"""Get component status"""
|
|
return ComponentStatus(
|
|
state=self.coordinator.state.name,
|
|
health=not self.monitor.get_monitoring_stats().get("stalled", False),
|
|
last_check=datetime.utcnow().isoformat(),
|
|
details={
|
|
"mode": self.coordinator.mode.value,
|
|
"metrics": self.metrics_manager.get_metrics(),
|
|
"monitoring": self.monitor.get_monitoring_stats(),
|
|
"stats": {
|
|
"uptime": self.stats.uptime.total_seconds(),
|
|
"peak_queue_size": self.stats.peak_queue_size,
|
|
"peak_memory_usage": self.stats.peak_memory_usage,
|
|
"total_processed": self.stats.total_processed,
|
|
"total_failed": self.stats.total_failed,
|
|
},
|
|
}
|
|
)
|
|
|
|
# Helper methods below...
|
|
async def _load_persisted_state(self) -> None:
|
|
"""Load persisted queue state"""
|
|
try:
|
|
state = await self.persistence.load_queue_state()
|
|
if state:
|
|
await self.state_manager.restore_state(state)
|
|
self.metrics_manager.restore_metrics(state.get("metrics", {}))
|
|
logger.info("Loaded persisted queue state")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load persisted state: {e}")
|
|
|
|
def _start_background_tasks(self) -> None:
|
|
"""Start background maintenance tasks"""
|
|
self._maintenance_task = asyncio.create_task(self._maintenance_loop())
|
|
self._stats_task = asyncio.create_task(self._stats_loop())
|
|
|
|
async def _maintenance_loop(self) -> None:
|
|
"""Background maintenance loop"""
|
|
while self.coordinator.state not in (QueueState.STOPPED, QueueState.ERROR):
|
|
try:
|
|
await asyncio.sleep(300) # Every 5 minutes
|
|
if self.coordinator.mode == QueueMode.MAINTENANCE:
|
|
continue
|
|
|
|
# Perform maintenance tasks
|
|
await self._perform_maintenance()
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in maintenance loop: {e}")
|
|
|
|
async def _stats_loop(self) -> None:
|
|
"""Background statistics loop"""
|
|
while self.coordinator.state not in (QueueState.STOPPED, QueueState.ERROR):
|
|
try:
|
|
await asyncio.sleep(60) # Every minute
|
|
await self._update_stats()
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in stats loop: {e}")
|
|
|
|
async def _perform_maintenance(self) -> None:
|
|
"""Perform maintenance tasks"""
|
|
try:
|
|
# Switch to maintenance mode
|
|
previous_mode = self.coordinator.mode
|
|
await self.coordinator.set_mode(QueueMode.MAINTENANCE)
|
|
|
|
# Perform maintenance tasks
|
|
await self._cleanup_old_data()
|
|
await self._optimize_queue()
|
|
await self._persist_state()
|
|
|
|
# Restore previous mode
|
|
await self.coordinator.set_mode(previous_mode)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during maintenance: {e}")
|
|
|
|
async def _cleanup_old_data(self) -> None:
|
|
"""Clean up old data"""
|
|
try:
|
|
await self.cleaner.cleanup_old_data(
|
|
self.state_manager, self.metrics_manager
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up old data: {e}")
|
|
|
|
async def _optimize_queue(self) -> None:
|
|
"""Optimize queue performance"""
|
|
try:
|
|
# Reorder queue based on priorities
|
|
await self.state_manager.optimize_queue()
|
|
|
|
# Update monitoring level based on queue size
|
|
queue_size = len(await self.state_manager.get_all_items())
|
|
if queue_size > self.config.max_queue_size * 0.8:
|
|
self.monitor.strategy.level = MonitoringLevel.INTENSIVE
|
|
elif queue_size < self.config.max_queue_size * 0.2:
|
|
self.monitor.strategy.level = self.config.monitoring_level
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error optimizing queue: {e}")
|
|
|
|
async def _update_stats(self) -> None:
|
|
"""Update queue statistics"""
|
|
try:
|
|
self.stats.uptime = datetime.utcnow() - self.stats.start_time
|
|
|
|
# Update peak values
|
|
queue_size = len(await self.state_manager.get_all_items())
|
|
self.stats.peak_queue_size = max(self.stats.peak_queue_size, queue_size)
|
|
|
|
memory_usage = self.metrics_manager.peak_memory_usage
|
|
self.stats.peak_memory_usage = max(
|
|
self.stats.peak_memory_usage, memory_usage
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating stats: {e}")
|
|
|
|
async def _persist_state(self) -> None:
|
|
"""Persist current state to storage"""
|
|
if not self.persistence:
|
|
return
|
|
|
|
try:
|
|
state = await self.state_manager.get_state_for_persistence()
|
|
state["metrics"] = self.metrics_manager.get_metrics()
|
|
state["stats"] = {
|
|
"uptime": self.stats.uptime.total_seconds(),
|
|
"peak_queue_size": self.stats.peak_queue_size,
|
|
"peak_memory_usage": self.stats.peak_memory_usage,
|
|
"total_processed": self.stats.total_processed,
|
|
"total_failed": self.stats.total_failed,
|
|
}
|
|
await self.persistence.persist_queue_state(state)
|
|
except Exception as e:
|
|
logger.error(f"Failed to persist state: {e}")
|
|
|
|
def _get_default_status(self) -> Dict[str, Any]:
|
|
"""Get default status when error occurs"""
|
|
return {
|
|
"pending": 0,
|
|
"processing": 0,
|
|
"completed": 0,
|
|
"failed": 0,
|
|
"active": False,
|
|
"stalled": False,
|
|
"metrics": {
|
|
"total_processed": 0,
|
|
"total_failed": 0,
|
|
"success_rate": 0.0,
|
|
"avg_processing_time": 0.0,
|
|
"peak_memory_usage": 0.0,
|
|
"last_cleanup": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"errors_by_type": {},
|
|
"compression_failures": 0,
|
|
"hardware_accel_failures": 0,
|
|
"last_activity": 0,
|
|
},
|
|
"state": QueueState.ERROR.value,
|
|
"mode": QueueMode.NORMAL.value,
|
|
"stats": {
|
|
"uptime": 0,
|
|
"peak_queue_size": 0,
|
|
"peak_memory_usage": 0,
|
|
"total_processed": 0,
|
|
"total_failed": 0,
|
|
},
|
|
}
|