mirror of
https://github.com/pacnpal/Pac-cogs.git
synced 2025-12-20 02:41:06 -05:00
from videoarchiver.utils import progress_tracker Updated the main init.py to properly handle module reloading: Added 'videoarchiver.processor' and 'videoarchiver.processor.core' to modules_to_reload Added explicit import and reload of the processor module These changes ensure that: The progress_tracker is properly imported from the utils package All necessary modules are reloaded during initialization Circular imports are avoided The package structure maintains proper dependency flow
450 lines
15 KiB
Python
450 lines
15 KiB
Python
"""Core VideoProcessor class that manages video processing operations"""
|
|
|
|
import logging
|
|
import asyncio
|
|
from enum import Enum, auto
|
|
from typing import Optional, Tuple, Dict, Any, List, TypedDict, ClassVar
|
|
from datetime import datetime, timedelta
|
|
import discord
|
|
from discord.ext import commands
|
|
|
|
from .message_handler import MessageHandler
|
|
from .queue_handler import QueueHandler
|
|
from videoarchiver.utils import progress_tracker
|
|
from .status_display import StatusDisplay
|
|
from .cleanup_manager import CleanupManager, CleanupStrategy
|
|
from .constants import REACTIONS
|
|
from ..queue.manager import EnhancedVideoQueueManager
|
|
from ..ffmpeg.ffmpeg_manager import FFmpegManager
|
|
from ..database.video_archive_db import VideoArchiveDB
|
|
from ..config_manager import ConfigManager
|
|
from ..utils.exceptions import ProcessorError
|
|
|
|
logger = logging.getLogger("VideoArchiver")
|
|
|
|
class ProcessorState(Enum):
|
|
"""Possible states of the video processor"""
|
|
INITIALIZING = auto()
|
|
READY = auto()
|
|
PROCESSING = auto()
|
|
PAUSED = auto()
|
|
ERROR = auto()
|
|
SHUTDOWN = auto()
|
|
|
|
class OperationType(Enum):
|
|
"""Types of processor operations"""
|
|
MESSAGE_PROCESSING = auto()
|
|
VIDEO_PROCESSING = auto()
|
|
QUEUE_MANAGEMENT = auto()
|
|
CLEANUP = auto()
|
|
|
|
class OperationDetails(TypedDict):
|
|
"""Type definition for operation details"""
|
|
type: str
|
|
start_time: datetime
|
|
end_time: Optional[datetime]
|
|
status: str
|
|
details: Dict[str, Any]
|
|
error: Optional[str]
|
|
|
|
class OperationStats(TypedDict):
|
|
"""Type definition for operation statistics"""
|
|
total_operations: int
|
|
active_operations: int
|
|
success_count: int
|
|
error_count: int
|
|
success_rate: float
|
|
|
|
class ProcessorStatus(TypedDict):
|
|
"""Type definition for processor status"""
|
|
state: str
|
|
health: bool
|
|
operations: OperationStats
|
|
active_operations: Dict[str, OperationDetails]
|
|
last_health_check: Optional[str]
|
|
health_status: Dict[str, bool]
|
|
|
|
class OperationTracker:
|
|
"""Tracks processor operations"""
|
|
|
|
MAX_HISTORY: ClassVar[int] = 1000 # Maximum number of operations to track
|
|
|
|
def __init__(self) -> None:
|
|
self.operations: Dict[str, OperationDetails] = {}
|
|
self.operation_history: List[OperationDetails] = []
|
|
self.error_count = 0
|
|
self.success_count = 0
|
|
|
|
def start_operation(
|
|
self,
|
|
op_type: OperationType,
|
|
details: Dict[str, Any]
|
|
) -> str:
|
|
"""
|
|
Start tracking an operation.
|
|
|
|
Args:
|
|
op_type: Type of operation
|
|
details: Operation details
|
|
|
|
Returns:
|
|
Operation ID string
|
|
"""
|
|
op_id = f"{op_type.value}_{datetime.utcnow().timestamp()}"
|
|
self.operations[op_id] = OperationDetails(
|
|
type=op_type.value,
|
|
start_time=datetime.utcnow(),
|
|
end_time=None,
|
|
status="running",
|
|
details=details,
|
|
error=None
|
|
)
|
|
return op_id
|
|
|
|
def end_operation(
|
|
self,
|
|
op_id: str,
|
|
success: bool,
|
|
error: Optional[str] = None
|
|
) -> None:
|
|
"""
|
|
End tracking an operation.
|
|
|
|
Args:
|
|
op_id: Operation ID
|
|
success: Whether operation succeeded
|
|
error: Optional error message
|
|
"""
|
|
if op_id in self.operations:
|
|
self.operations[op_id].update({
|
|
"end_time": datetime.utcnow(),
|
|
"status": "success" if success else "error",
|
|
"error": error
|
|
})
|
|
# Move to history
|
|
self.operation_history.append(self.operations.pop(op_id))
|
|
# Update counts
|
|
if success:
|
|
self.success_count += 1
|
|
else:
|
|
self.error_count += 1
|
|
|
|
# Cleanup old history if needed
|
|
if len(self.operation_history) > self.MAX_HISTORY:
|
|
self.operation_history = self.operation_history[-self.MAX_HISTORY:]
|
|
|
|
def get_active_operations(self) -> Dict[str, OperationDetails]:
|
|
"""
|
|
Get currently active operations.
|
|
|
|
Returns:
|
|
Dictionary of active operations
|
|
"""
|
|
return self.operations.copy()
|
|
|
|
def get_operation_stats(self) -> OperationStats:
|
|
"""
|
|
Get operation statistics.
|
|
|
|
Returns:
|
|
Dictionary containing operation statistics
|
|
"""
|
|
total = self.success_count + self.error_count
|
|
return OperationStats(
|
|
total_operations=len(self.operation_history) + len(self.operations),
|
|
active_operations=len(self.operations),
|
|
success_count=self.success_count,
|
|
error_count=self.error_count,
|
|
success_rate=self.success_count / total if total > 0 else 0.0
|
|
)
|
|
|
|
class HealthMonitor:
|
|
"""Monitors processor health"""
|
|
|
|
HEALTH_CHECK_INTERVAL: ClassVar[int] = 60 # Seconds between health checks
|
|
ERROR_CHECK_INTERVAL: ClassVar[int] = 30 # Seconds between checks after error
|
|
SUCCESS_RATE_THRESHOLD: ClassVar[float] = 0.9 # 90% success rate threshold
|
|
|
|
def __init__(self, processor: 'VideoProcessor') -> None:
|
|
self.processor = processor
|
|
self.last_check: Optional[datetime] = None
|
|
self.health_status: Dict[str, bool] = {}
|
|
self._monitor_task: Optional[asyncio.Task] = None
|
|
|
|
async def start_monitoring(self) -> None:
|
|
"""Start health monitoring"""
|
|
self._monitor_task = asyncio.create_task(self._monitor_health())
|
|
|
|
async def stop_monitoring(self) -> None:
|
|
"""Stop health monitoring"""
|
|
if self._monitor_task:
|
|
self._monitor_task.cancel()
|
|
try:
|
|
await self._monitor_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception as e:
|
|
logger.error(f"Error stopping health monitor: {e}")
|
|
|
|
async def _monitor_health(self) -> None:
|
|
"""Monitor processor health"""
|
|
while True:
|
|
try:
|
|
self.last_check = datetime.utcnow()
|
|
|
|
# Check component health
|
|
self.health_status.update({
|
|
"queue_handler": self.processor.queue_handler.is_healthy(),
|
|
"message_handler": self.processor.message_handler.is_healthy(),
|
|
"progress_tracker": progress_tracker.is_healthy()
|
|
})
|
|
|
|
# Check operation health
|
|
op_stats = self.processor.operation_tracker.get_operation_stats()
|
|
self.health_status["operations"] = (
|
|
op_stats["success_rate"] >= self.SUCCESS_RATE_THRESHOLD
|
|
)
|
|
|
|
await asyncio.sleep(self.HEALTH_CHECK_INTERVAL)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Health monitoring error: {e}", exc_info=True)
|
|
await asyncio.sleep(self.ERROR_CHECK_INTERVAL)
|
|
|
|
def is_healthy(self) -> bool:
|
|
"""
|
|
Check if processor is healthy.
|
|
|
|
Returns:
|
|
True if all components are healthy, False otherwise
|
|
"""
|
|
return all(self.health_status.values())
|
|
|
|
class VideoProcessor:
|
|
"""Handles video processing operations"""
|
|
|
|
def __init__(
|
|
self,
|
|
bot: commands.Bot,
|
|
config_manager: ConfigManager,
|
|
components: Dict[int, Dict[str, Any]],
|
|
queue_manager: Optional[EnhancedVideoQueueManager] = None,
|
|
ffmpeg_mgr: Optional[FFmpegManager] = None,
|
|
db: Optional[VideoArchiveDB] = None
|
|
) -> None:
|
|
self.bot = bot
|
|
self.config = config_manager
|
|
self.components = components
|
|
self.ffmpeg_mgr = ffmpeg_mgr
|
|
self.db = db
|
|
self.queue_manager = queue_manager
|
|
|
|
# Initialize state
|
|
self.state = ProcessorState.INITIALIZING
|
|
self.operation_tracker = OperationTracker()
|
|
self.health_monitor = HealthMonitor(self)
|
|
|
|
try:
|
|
# Initialize handlers
|
|
self.queue_handler = QueueHandler(bot, config_manager, components)
|
|
self.message_handler = MessageHandler(bot, config_manager, queue_manager)
|
|
self.cleanup_manager = CleanupManager(
|
|
self.queue_handler,
|
|
ffmpeg_mgr,
|
|
CleanupStrategy.NORMAL
|
|
)
|
|
|
|
# Pass db to queue handler if it exists
|
|
if self.db:
|
|
self.queue_handler.db = self.db
|
|
|
|
# Store queue task reference
|
|
self._queue_task: Optional[asyncio.Task] = None
|
|
|
|
# Mark as ready
|
|
self.state = ProcessorState.READY
|
|
logger.info("VideoProcessor initialized successfully")
|
|
|
|
except Exception as e:
|
|
self.state = ProcessorState.ERROR
|
|
logger.error(f"Error initializing VideoProcessor: {e}", exc_info=True)
|
|
raise ProcessorError(f"Failed to initialize processor: {str(e)}")
|
|
|
|
async def start(self) -> None:
|
|
"""
|
|
Start processor operations.
|
|
|
|
Raises:
|
|
ProcessorError: If startup fails
|
|
"""
|
|
try:
|
|
await self.health_monitor.start_monitoring()
|
|
logger.info("VideoProcessor started successfully")
|
|
except Exception as e:
|
|
error = f"Failed to start processor: {str(e)}"
|
|
logger.error(error, exc_info=True)
|
|
raise ProcessorError(error)
|
|
|
|
async def process_video(self, item: Any) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Process a video from the queue.
|
|
|
|
Args:
|
|
item: Queue item to process
|
|
|
|
Returns:
|
|
Tuple of (success, error_message)
|
|
|
|
Raises:
|
|
ProcessorError: If processing fails
|
|
"""
|
|
op_id = self.operation_tracker.start_operation(
|
|
OperationType.VIDEO_PROCESSING,
|
|
{"item": str(item)}
|
|
)
|
|
|
|
try:
|
|
self.state = ProcessorState.PROCESSING
|
|
result = await self.queue_handler.process_video(item)
|
|
success = result[0]
|
|
error = None if success else result[1]
|
|
self.operation_tracker.end_operation(op_id, success, error)
|
|
return result
|
|
except Exception as e:
|
|
error = f"Video processing failed: {str(e)}"
|
|
self.operation_tracker.end_operation(op_id, False, error)
|
|
logger.error(error, exc_info=True)
|
|
raise ProcessorError(error)
|
|
finally:
|
|
self.state = ProcessorState.READY
|
|
|
|
async def process_message(self, message: discord.Message) -> None:
|
|
"""
|
|
Process a message for video content.
|
|
|
|
Args:
|
|
message: Discord message to process
|
|
|
|
Raises:
|
|
ProcessorError: If processing fails
|
|
"""
|
|
op_id = self.operation_tracker.start_operation(
|
|
OperationType.MESSAGE_PROCESSING,
|
|
{"message_id": message.id}
|
|
)
|
|
|
|
try:
|
|
await self.message_handler.process_message(message)
|
|
self.operation_tracker.end_operation(op_id, True)
|
|
except Exception as e:
|
|
error = f"Message processing failed: {str(e)}"
|
|
self.operation_tracker.end_operation(op_id, False, error)
|
|
logger.error(error, exc_info=True)
|
|
raise ProcessorError(error)
|
|
|
|
async def cleanup(self) -> None:
|
|
"""
|
|
Clean up resources and stop processing.
|
|
|
|
Raises:
|
|
ProcessorError: If cleanup fails
|
|
"""
|
|
op_id = self.operation_tracker.start_operation(
|
|
OperationType.CLEANUP,
|
|
{"type": "normal"}
|
|
)
|
|
|
|
try:
|
|
self.state = ProcessorState.SHUTDOWN
|
|
await self.health_monitor.stop_monitoring()
|
|
await self.cleanup_manager.cleanup()
|
|
self.operation_tracker.end_operation(op_id, True)
|
|
except Exception as e:
|
|
error = f"Cleanup failed: {str(e)}"
|
|
self.operation_tracker.end_operation(op_id, False, error)
|
|
logger.error(error, exc_info=True)
|
|
raise ProcessorError(error)
|
|
|
|
async def force_cleanup(self) -> None:
|
|
"""
|
|
Force cleanup of resources.
|
|
|
|
Raises:
|
|
ProcessorError: If force cleanup fails
|
|
"""
|
|
op_id = self.operation_tracker.start_operation(
|
|
OperationType.CLEANUP,
|
|
{"type": "force"}
|
|
)
|
|
|
|
try:
|
|
self.state = ProcessorState.SHUTDOWN
|
|
await self.health_monitor.stop_monitoring()
|
|
await self.cleanup_manager.force_cleanup()
|
|
self.operation_tracker.end_operation(op_id, True)
|
|
except Exception as e:
|
|
error = f"Force cleanup failed: {str(e)}"
|
|
self.operation_tracker.end_operation(op_id, False, error)
|
|
logger.error(error, exc_info=True)
|
|
raise ProcessorError(error)
|
|
|
|
async def show_queue_details(self, ctx: commands.Context) -> None:
|
|
"""
|
|
Display detailed queue status.
|
|
|
|
Args:
|
|
ctx: Command context
|
|
"""
|
|
try:
|
|
if not self.queue_manager:
|
|
await ctx.send("Queue manager is not initialized.")
|
|
return
|
|
|
|
# Get queue status
|
|
queue_status = self.queue_manager.get_queue_status(ctx.guild.id)
|
|
|
|
# Get active operations
|
|
active_ops = self.operation_tracker.get_active_operations()
|
|
|
|
# Create and send status embed
|
|
embed = await StatusDisplay.create_queue_status_embed(
|
|
queue_status,
|
|
active_ops
|
|
)
|
|
await ctx.send(embed=embed)
|
|
|
|
except Exception as e:
|
|
error = f"Failed to show queue details: {str(e)}"
|
|
logger.error(error, exc_info=True)
|
|
await ctx.send(f"Error getting queue details: {str(e)}")
|
|
|
|
def set_queue_task(self, task: asyncio.Task) -> None:
|
|
"""
|
|
Set the queue processing task.
|
|
|
|
Args:
|
|
task: Queue processing task
|
|
"""
|
|
self._queue_task = task
|
|
self.cleanup_manager.set_queue_task(task)
|
|
|
|
def get_status(self) -> ProcessorStatus:
|
|
"""
|
|
Get processor status.
|
|
|
|
Returns:
|
|
Dictionary containing processor status information
|
|
"""
|
|
return ProcessorStatus(
|
|
state=self.state.value,
|
|
health=self.health_monitor.is_healthy(),
|
|
operations=self.operation_tracker.get_operation_stats(),
|
|
active_operations=self.operation_tracker.get_active_operations(),
|
|
last_health_check=(
|
|
self.health_monitor.last_check.isoformat()
|
|
if self.health_monitor.last_check
|
|
else None
|
|
),
|
|
health_status=self.health_monitor.health_status
|
|
)
|