Moved progress_tracker.py to utils/ as the single source of truth

Implemented proper singleton pattern in ProgressTracker class
Created shared instance in utils/init.py
Updated all files to use relative imports correctly
Removed duplicate progress_tracker.py from processor/
Fixed circular imports by removing ProgressTracker from processor.py re-exports
This commit is contained in:
pacnpal
2024-11-17 01:24:03 +00:00
parent 98ed3dfc6a
commit b10722f05b
2 changed files with 65 additions and 50 deletions

View File

@@ -3,7 +3,6 @@
from .processor import ( from .processor import (
VideoProcessor, VideoProcessor,
REACTIONS, REACTIONS,
ProgressTracker,
MessageHandler, MessageHandler,
QueueHandler QueueHandler
) )
@@ -11,7 +10,6 @@ from .processor import (
__all__ = [ __all__ = [
'VideoProcessor', 'VideoProcessor',
'REACTIONS', 'REACTIONS',
'ProgressTracker',
'MessageHandler', 'MessageHandler',
'QueueHandler' 'QueueHandler'
] ]

View File

@@ -4,7 +4,18 @@ import logging
import asyncio import asyncio
from enum import Enum, auto from enum import Enum, auto
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List, Set, TypedDict, ClassVar, Callable, Awaitable, Tuple from typing import (
Optional,
Dict,
Any,
List,
Set,
TypedDict,
ClassVar,
Callable,
Awaitable,
Tuple,
)
from datetime import datetime, timedelta from datetime import datetime, timedelta
from .queue_handler import QueueHandler from .queue_handler import QueueHandler
@@ -13,44 +24,55 @@ from ..utils.exceptions import CleanupError
logger = logging.getLogger("VideoArchiver") logger = logging.getLogger("VideoArchiver")
class CleanupStage(Enum): class CleanupStage(Enum):
"""Cleanup stages""" """Cleanup stages"""
QUEUE = auto() QUEUE = auto()
FFMPEG = auto() FFMPEG = auto()
TASKS = auto() TASKS = auto()
RESOURCES = auto() RESOURCES = auto()
class CleanupStrategy(Enum): class CleanupStrategy(Enum):
"""Cleanup strategies""" """Cleanup strategies"""
NORMAL = auto() NORMAL = auto()
FORCE = auto() FORCE = auto()
GRACEFUL = auto() GRACEFUL = auto()
class CleanupStats(TypedDict): class CleanupStats(TypedDict):
"""Type definition for cleanup statistics""" """Type definition for cleanup statistics"""
total_cleanups: int total_cleanups: int
active_cleanups: int active_cleanups: int
success_rate: float success_rate: float
average_duration: float average_duration: float
stage_success_rates: Dict[str, float] stage_success_rates: Dict[str, float]
@dataclass @dataclass
class CleanupResult: class CleanupResult:
"""Result of a cleanup operation""" """Result of a cleanup operation"""
success: bool success: bool
stage: CleanupStage stage: CleanupStage
error: Optional[str] = None error: Optional[str] = None
duration: float = 0.0 duration: float = 0.0
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
@dataclass @dataclass
class CleanupOperation: class CleanupOperation:
"""Represents a cleanup operation""" """Represents a cleanup operation"""
stage: CleanupStage stage: CleanupStage
func: Callable[[], Awaitable[None]] func: Callable[[], Awaitable[None]]
force_func: Optional[Callable[[], Awaitable[None]]] = None force_func: Optional[Callable[[], Awaitable[None]]] = None
timeout: float = 30.0 # Default timeout in seconds timeout: float = 30.0 # Default timeout in seconds
class CleanupTracker: class CleanupTracker:
"""Tracks cleanup operations""" """Tracks cleanup operations"""
@@ -65,7 +87,7 @@ class CleanupTracker:
def start_cleanup(self, cleanup_id: str) -> None: def start_cleanup(self, cleanup_id: str) -> None:
""" """
Start tracking a cleanup operation. Start tracking a cleanup operation.
Args: Args:
cleanup_id: Unique identifier for the cleanup operation cleanup_id: Unique identifier for the cleanup operation
""" """
@@ -75,16 +97,12 @@ class CleanupTracker:
# Cleanup old history if needed # Cleanup old history if needed
if len(self.cleanup_history) >= self.MAX_HISTORY: if len(self.cleanup_history) >= self.MAX_HISTORY:
self.cleanup_history = self.cleanup_history[-self.MAX_HISTORY:] self.cleanup_history = self.cleanup_history[-self.MAX_HISTORY :]
def record_stage_result( def record_stage_result(self, cleanup_id: str, result: CleanupResult) -> None:
self,
cleanup_id: str,
result: CleanupResult
) -> None:
""" """
Record result of a cleanup stage. Record result of a cleanup stage.
Args: Args:
cleanup_id: Cleanup operation identifier cleanup_id: Cleanup operation identifier
result: Result of the cleanup stage result: Result of the cleanup stage
@@ -95,19 +113,23 @@ class CleanupTracker:
def end_cleanup(self, cleanup_id: str) -> None: def end_cleanup(self, cleanup_id: str) -> None:
""" """
End tracking a cleanup operation. End tracking a cleanup operation.
Args: Args:
cleanup_id: Cleanup operation identifier cleanup_id: Cleanup operation identifier
""" """
if cleanup_id in self.active_cleanups: if cleanup_id in self.active_cleanups:
end_time = datetime.utcnow() end_time = datetime.utcnow()
self.cleanup_history.append({ self.cleanup_history.append(
"id": cleanup_id, {
"start_time": self.start_times[cleanup_id], "id": cleanup_id,
"end_time": end_time, "start_time": self.start_times[cleanup_id],
"duration": (end_time - self.start_times[cleanup_id]).total_seconds(), "end_time": end_time,
"results": self.stage_results[cleanup_id] "duration": (
}) end_time - self.start_times[cleanup_id]
).total_seconds(),
"results": self.stage_results[cleanup_id],
}
)
self.active_cleanups.remove(cleanup_id) self.active_cleanups.remove(cleanup_id)
self.start_times.pop(cleanup_id) self.start_times.pop(cleanup_id)
self.stage_results.pop(cleanup_id) self.stage_results.pop(cleanup_id)
@@ -115,7 +137,7 @@ class CleanupTracker:
def get_cleanup_stats(self) -> CleanupStats: def get_cleanup_stats(self) -> CleanupStats:
""" """
Get cleanup statistics. Get cleanup statistics.
Returns: Returns:
Dictionary containing cleanup statistics Dictionary containing cleanup statistics
""" """
@@ -124,7 +146,7 @@ class CleanupTracker:
active_cleanups=len(self.active_cleanups), active_cleanups=len(self.active_cleanups),
success_rate=self._calculate_success_rate(), success_rate=self._calculate_success_rate(),
average_duration=self._calculate_average_duration(), average_duration=self._calculate_average_duration(),
stage_success_rates=self._calculate_stage_success_rates() stage_success_rates=self._calculate_stage_success_rates(),
) )
def _calculate_success_rate(self) -> float: def _calculate_success_rate(self) -> float:
@@ -132,7 +154,8 @@ class CleanupTracker:
if not self.cleanup_history: if not self.cleanup_history:
return 1.0 return 1.0
successful = sum( successful = sum(
1 for cleanup in self.cleanup_history 1
for cleanup in self.cleanup_history
if all(result.success for result in cleanup["results"]) if all(result.success for result in cleanup["results"])
) )
return successful / len(self.cleanup_history) return successful / len(self.cleanup_history)
@@ -148,7 +171,7 @@ class CleanupTracker:
"""Calculate success rates by stage""" """Calculate success rates by stage"""
stage_attempts: Dict[str, int] = {} stage_attempts: Dict[str, int] = {}
stage_successes: Dict[str, int] = {} stage_successes: Dict[str, int] = {}
for cleanup in self.cleanup_history: for cleanup in self.cleanup_history:
for result in cleanup["results"]: for result in cleanup["results"]:
stage = result.stage.value stage = result.stage.value
@@ -161,6 +184,7 @@ class CleanupTracker:
for stage, attempts in stage_attempts.items() for stage, attempts in stage_attempts.items()
} }
class CleanupManager: class CleanupManager:
"""Manages cleanup operations for the video processor""" """Manages cleanup operations for the video processor"""
@@ -170,7 +194,7 @@ class CleanupManager:
self, self,
queue_handler: QueueHandler, queue_handler: QueueHandler,
ffmpeg_mgr: Optional[FFmpegManager] = None, ffmpeg_mgr: Optional[FFmpegManager] = None,
strategy: CleanupStrategy = CleanupStrategy.NORMAL strategy: CleanupStrategy = CleanupStrategy.NORMAL,
) -> None: ) -> None:
self.queue_handler = queue_handler self.queue_handler = queue_handler
self.ffmpeg_mgr = ffmpeg_mgr self.ffmpeg_mgr = ffmpeg_mgr
@@ -184,54 +208,50 @@ class CleanupManager:
stage=CleanupStage.QUEUE, stage=CleanupStage.QUEUE,
func=self._cleanup_queue, func=self._cleanup_queue,
force_func=self._force_cleanup_queue, force_func=self._force_cleanup_queue,
timeout=30.0 timeout=30.0,
), ),
CleanupOperation( CleanupOperation(
stage=CleanupStage.FFMPEG, stage=CleanupStage.FFMPEG,
func=self._cleanup_ffmpeg, func=self._cleanup_ffmpeg,
force_func=self._force_cleanup_ffmpeg, force_func=self._force_cleanup_ffmpeg,
timeout=15.0 timeout=15.0,
), ),
CleanupOperation( CleanupOperation(
stage=CleanupStage.TASKS, stage=CleanupStage.TASKS,
func=self._cleanup_tasks, func=self._cleanup_tasks,
force_func=self._force_cleanup_tasks, force_func=self._force_cleanup_tasks,
timeout=15.0 timeout=15.0,
) ),
] ]
async def cleanup(self) -> None: async def cleanup(self) -> None:
""" """
Perform normal cleanup of resources. Perform normal cleanup of resources.
Raises: Raises:
CleanupError: If cleanup fails CleanupError: If cleanup fails
""" """
cleanup_id = f"cleanup_{datetime.utcnow().timestamp()}" cleanup_id = f"cleanup_{datetime.utcnow().timestamp()}"
self.tracker.start_cleanup(cleanup_id) self.tracker.start_cleanup(cleanup_id)
try: try:
logger.info("Starting normal cleanup...") logger.info("Starting normal cleanup...")
# Clean up in stages # Clean up in stages
for operation in self.cleanup_operations: for operation in self.cleanup_operations:
try: try:
start_time = datetime.utcnow() start_time = datetime.utcnow()
await asyncio.wait_for( await asyncio.wait_for(operation.func(), timeout=operation.timeout)
operation.func(),
timeout=operation.timeout
)
duration = (datetime.utcnow() - start_time).total_seconds() duration = (datetime.utcnow() - start_time).total_seconds()
self.tracker.record_stage_result( self.tracker.record_stage_result(
cleanup_id, cleanup_id,
CleanupResult(True, operation.stage, duration=duration) CleanupResult(True, operation.stage, duration=duration),
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
error = f"Cleanup stage {operation.stage.value} timed out" error = f"Cleanup stage {operation.stage.value} timed out"
logger.error(error) logger.error(error)
self.tracker.record_stage_result( self.tracker.record_stage_result(
cleanup_id, cleanup_id, CleanupResult(False, operation.stage, error)
CleanupResult(False, operation.stage, error)
) )
if self.strategy != CleanupStrategy.GRACEFUL: if self.strategy != CleanupStrategy.GRACEFUL:
raise CleanupError(error) raise CleanupError(error)
@@ -239,8 +259,7 @@ class CleanupManager:
error = f"Error in {operation.stage.value} cleanup: {e}" error = f"Error in {operation.stage.value} cleanup: {e}"
logger.error(error) logger.error(error)
self.tracker.record_stage_result( self.tracker.record_stage_result(
cleanup_id, cleanup_id, CleanupResult(False, operation.stage, str(e))
CleanupResult(False, operation.stage, str(e))
) )
if self.strategy != CleanupStrategy.GRACEFUL: if self.strategy != CleanupStrategy.GRACEFUL:
raise CleanupError(error) raise CleanupError(error)
@@ -260,7 +279,7 @@ class CleanupManager:
"""Force cleanup of resources when normal cleanup fails""" """Force cleanup of resources when normal cleanup fails"""
cleanup_id = f"force_cleanup_{datetime.utcnow().timestamp()}" cleanup_id = f"force_cleanup_{datetime.utcnow().timestamp()}"
self.tracker.start_cleanup(cleanup_id) self.tracker.start_cleanup(cleanup_id)
try: try:
logger.info("Starting force cleanup...") logger.info("Starting force cleanup...")
@@ -272,19 +291,17 @@ class CleanupManager:
try: try:
start_time = datetime.utcnow() start_time = datetime.utcnow()
await asyncio.wait_for( await asyncio.wait_for(
operation.force_func(), operation.force_func(), timeout=operation.timeout
timeout=operation.timeout
) )
duration = (datetime.utcnow() - start_time).total_seconds() duration = (datetime.utcnow() - start_time).total_seconds()
self.tracker.record_stage_result( self.tracker.record_stage_result(
cleanup_id, cleanup_id,
CleanupResult(True, operation.stage, duration=duration) CleanupResult(True, operation.stage, duration=duration),
) )
except Exception as e: except Exception as e:
logger.error(f"Error in force {operation.stage.value} cleanup: {e}") logger.error(f"Error in force {operation.stage.value} cleanup: {e}")
self.tracker.record_stage_result( self.tracker.record_stage_result(
cleanup_id, cleanup_id, CleanupResult(False, operation.stage, str(e))
CleanupResult(False, operation.stage, str(e))
) )
logger.info("Force cleanup completed") logger.info("Force cleanup completed")
@@ -338,7 +355,7 @@ class CleanupManager:
def set_queue_task(self, task: asyncio.Task) -> None: def set_queue_task(self, task: asyncio.Task) -> None:
""" """
Set the queue processing task for cleanup purposes. Set the queue processing task for cleanup purposes.
Args: Args:
task: Queue processing task to track task: Queue processing task to track
""" """
@@ -347,7 +364,7 @@ class CleanupManager:
def get_cleanup_stats(self) -> Dict[str, Any]: def get_cleanup_stats(self) -> Dict[str, Any]:
""" """
Get cleanup statistics. Get cleanup statistics.
Returns: Returns:
Dictionary containing cleanup statistics and status Dictionary containing cleanup statistics and status
""" """
@@ -359,8 +376,8 @@ class CleanupManager:
{ {
"stage": op.stage.value, "stage": op.stage.value,
"timeout": op.timeout, "timeout": op.timeout,
"has_force_cleanup": op.force_func is not None "has_force_cleanup": op.force_func is not None,
} }
for op in self.cleanup_operations for op in self.cleanup_operations
] ],
} }