diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index 884e7fd..867afb0 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -3,7 +3,6 @@ from .processor import ( VideoProcessor, REACTIONS, - ProgressTracker, MessageHandler, QueueHandler ) @@ -11,7 +10,6 @@ from .processor import ( __all__ = [ 'VideoProcessor', 'REACTIONS', - 'ProgressTracker', 'MessageHandler', 'QueueHandler' ] diff --git a/videoarchiver/processor/cleanup_manager.py b/videoarchiver/processor/cleanup_manager.py index d8f75d8..6e0d330 100644 --- a/videoarchiver/processor/cleanup_manager.py +++ b/videoarchiver/processor/cleanup_manager.py @@ -4,7 +4,18 @@ import logging import asyncio from enum import Enum, auto from dataclasses import dataclass, field -from typing import Optional, Dict, Any, List, Set, TypedDict, ClassVar, Callable, Awaitable, Tuple +from typing import ( + Optional, + Dict, + Any, + List, + Set, + TypedDict, + ClassVar, + Callable, + Awaitable, + Tuple, +) from datetime import datetime, timedelta from .queue_handler import QueueHandler @@ -13,44 +24,55 @@ from ..utils.exceptions import CleanupError logger = logging.getLogger("VideoArchiver") + class CleanupStage(Enum): """Cleanup stages""" + QUEUE = auto() FFMPEG = auto() TASKS = auto() RESOURCES = auto() + class CleanupStrategy(Enum): """Cleanup strategies""" + NORMAL = auto() FORCE = auto() GRACEFUL = auto() + class CleanupStats(TypedDict): """Type definition for cleanup statistics""" + total_cleanups: int active_cleanups: int success_rate: float average_duration: float stage_success_rates: Dict[str, float] + @dataclass class CleanupResult: """Result of a cleanup operation""" + success: bool stage: CleanupStage error: Optional[str] = None duration: float = 0.0 timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) + @dataclass class CleanupOperation: """Represents a cleanup operation""" + stage: CleanupStage func: Callable[[], Awaitable[None]] force_func: Optional[Callable[[], Awaitable[None]]] = None timeout: float = 30.0 # Default timeout in seconds + class CleanupTracker: """Tracks cleanup operations""" @@ -65,7 +87,7 @@ class CleanupTracker: def start_cleanup(self, cleanup_id: str) -> None: """ Start tracking a cleanup operation. - + Args: cleanup_id: Unique identifier for the cleanup operation """ @@ -75,16 +97,12 @@ class CleanupTracker: # Cleanup old history if needed if len(self.cleanup_history) >= self.MAX_HISTORY: - self.cleanup_history = self.cleanup_history[-self.MAX_HISTORY:] + self.cleanup_history = self.cleanup_history[-self.MAX_HISTORY :] - def record_stage_result( - self, - cleanup_id: str, - result: CleanupResult - ) -> None: + def record_stage_result(self, cleanup_id: str, result: CleanupResult) -> None: """ Record result of a cleanup stage. - + Args: cleanup_id: Cleanup operation identifier result: Result of the cleanup stage @@ -95,19 +113,23 @@ class CleanupTracker: def end_cleanup(self, cleanup_id: str) -> None: """ End tracking a cleanup operation. - + Args: cleanup_id: Cleanup operation identifier """ if cleanup_id in self.active_cleanups: end_time = datetime.utcnow() - self.cleanup_history.append({ - "id": cleanup_id, - "start_time": self.start_times[cleanup_id], - "end_time": end_time, - "duration": (end_time - self.start_times[cleanup_id]).total_seconds(), - "results": self.stage_results[cleanup_id] - }) + self.cleanup_history.append( + { + "id": cleanup_id, + "start_time": self.start_times[cleanup_id], + "end_time": end_time, + "duration": ( + end_time - self.start_times[cleanup_id] + ).total_seconds(), + "results": self.stage_results[cleanup_id], + } + ) self.active_cleanups.remove(cleanup_id) self.start_times.pop(cleanup_id) self.stage_results.pop(cleanup_id) @@ -115,7 +137,7 @@ class CleanupTracker: def get_cleanup_stats(self) -> CleanupStats: """ Get cleanup statistics. - + Returns: Dictionary containing cleanup statistics """ @@ -124,7 +146,7 @@ class CleanupTracker: active_cleanups=len(self.active_cleanups), success_rate=self._calculate_success_rate(), average_duration=self._calculate_average_duration(), - stage_success_rates=self._calculate_stage_success_rates() + stage_success_rates=self._calculate_stage_success_rates(), ) def _calculate_success_rate(self) -> float: @@ -132,7 +154,8 @@ class CleanupTracker: if not self.cleanup_history: return 1.0 successful = sum( - 1 for cleanup in self.cleanup_history + 1 + for cleanup in self.cleanup_history if all(result.success for result in cleanup["results"]) ) return successful / len(self.cleanup_history) @@ -148,7 +171,7 @@ class CleanupTracker: """Calculate success rates by stage""" stage_attempts: Dict[str, int] = {} stage_successes: Dict[str, int] = {} - + for cleanup in self.cleanup_history: for result in cleanup["results"]: stage = result.stage.value @@ -161,6 +184,7 @@ class CleanupTracker: for stage, attempts in stage_attempts.items() } + class CleanupManager: """Manages cleanup operations for the video processor""" @@ -170,7 +194,7 @@ class CleanupManager: self, queue_handler: QueueHandler, ffmpeg_mgr: Optional[FFmpegManager] = None, - strategy: CleanupStrategy = CleanupStrategy.NORMAL + strategy: CleanupStrategy = CleanupStrategy.NORMAL, ) -> None: self.queue_handler = queue_handler self.ffmpeg_mgr = ffmpeg_mgr @@ -184,54 +208,50 @@ class CleanupManager: stage=CleanupStage.QUEUE, func=self._cleanup_queue, force_func=self._force_cleanup_queue, - timeout=30.0 + timeout=30.0, ), CleanupOperation( stage=CleanupStage.FFMPEG, func=self._cleanup_ffmpeg, force_func=self._force_cleanup_ffmpeg, - timeout=15.0 + timeout=15.0, ), CleanupOperation( stage=CleanupStage.TASKS, func=self._cleanup_tasks, force_func=self._force_cleanup_tasks, - timeout=15.0 - ) + timeout=15.0, + ), ] async def cleanup(self) -> None: """ Perform normal cleanup of resources. - + Raises: CleanupError: If cleanup fails """ cleanup_id = f"cleanup_{datetime.utcnow().timestamp()}" self.tracker.start_cleanup(cleanup_id) - + try: logger.info("Starting normal cleanup...") - + # Clean up in stages for operation in self.cleanup_operations: try: start_time = datetime.utcnow() - await asyncio.wait_for( - operation.func(), - timeout=operation.timeout - ) + await asyncio.wait_for(operation.func(), timeout=operation.timeout) duration = (datetime.utcnow() - start_time).total_seconds() self.tracker.record_stage_result( cleanup_id, - CleanupResult(True, operation.stage, duration=duration) + CleanupResult(True, operation.stage, duration=duration), ) except asyncio.TimeoutError: error = f"Cleanup stage {operation.stage.value} timed out" logger.error(error) self.tracker.record_stage_result( - cleanup_id, - CleanupResult(False, operation.stage, error) + cleanup_id, CleanupResult(False, operation.stage, error) ) if self.strategy != CleanupStrategy.GRACEFUL: raise CleanupError(error) @@ -239,8 +259,7 @@ class CleanupManager: error = f"Error in {operation.stage.value} cleanup: {e}" logger.error(error) self.tracker.record_stage_result( - cleanup_id, - CleanupResult(False, operation.stage, str(e)) + cleanup_id, CleanupResult(False, operation.stage, str(e)) ) if self.strategy != CleanupStrategy.GRACEFUL: raise CleanupError(error) @@ -260,7 +279,7 @@ class CleanupManager: """Force cleanup of resources when normal cleanup fails""" cleanup_id = f"force_cleanup_{datetime.utcnow().timestamp()}" self.tracker.start_cleanup(cleanup_id) - + try: logger.info("Starting force cleanup...") @@ -272,19 +291,17 @@ class CleanupManager: try: start_time = datetime.utcnow() await asyncio.wait_for( - operation.force_func(), - timeout=operation.timeout + operation.force_func(), timeout=operation.timeout ) duration = (datetime.utcnow() - start_time).total_seconds() self.tracker.record_stage_result( cleanup_id, - CleanupResult(True, operation.stage, duration=duration) + CleanupResult(True, operation.stage, duration=duration), ) except Exception as e: logger.error(f"Error in force {operation.stage.value} cleanup: {e}") self.tracker.record_stage_result( - cleanup_id, - CleanupResult(False, operation.stage, str(e)) + cleanup_id, CleanupResult(False, operation.stage, str(e)) ) logger.info("Force cleanup completed") @@ -338,7 +355,7 @@ class CleanupManager: def set_queue_task(self, task: asyncio.Task) -> None: """ Set the queue processing task for cleanup purposes. - + Args: task: Queue processing task to track """ @@ -347,7 +364,7 @@ class CleanupManager: def get_cleanup_stats(self) -> Dict[str, Any]: """ Get cleanup statistics. - + Returns: Dictionary containing cleanup statistics and status """ @@ -359,8 +376,8 @@ class CleanupManager: { "stage": op.stage.value, "timeout": op.timeout, - "has_force_cleanup": op.force_func is not None + "has_force_cleanup": op.force_func is not None, } for op in self.cleanup_operations - ] + ], }