Files
Pac-cogs/videoarchiver/queue/cleanup.py
pacnpal a4ca6e8ea6 Core Systems:
Component-based architecture with lifecycle management
Enhanced error handling and recovery mechanisms
Comprehensive state management and tracking
Event-driven architecture with monitoring
Queue Management:

Multiple processing strategies for different scenarios
Advanced state management with recovery
Comprehensive metrics and health monitoring
Sophisticated cleanup system with multiple strategies
Processing Pipeline:

Enhanced message handling with validation
Improved URL extraction and processing
Better queue management and monitoring
Advanced cleanup mechanisms
Overall Benefits:

Better code organization and maintainability
Improved error handling and recovery
Enhanced monitoring and reporting
More robust and reliable system
2024-11-16 05:01:29 +00:00

461 lines
16 KiB
Python

"""Queue cleanup operations"""
import asyncio
import logging
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional, Any, Tuple
from datetime import datetime, timedelta
from .models import QueueItem, QueueMetrics
from .cleaners.history_cleaner import (
HistoryCleaner,
CleanupStrategy as HistoryStrategy
)
from .cleaners.guild_cleaner import (
GuildCleaner,
GuildCleanupStrategy
)
from .cleaners.tracking_cleaner import (
TrackingCleaner,
TrackingCleanupStrategy
)
logger = logging.getLogger("QueueCleanup")
class CleanupMode(Enum):
"""Cleanup operation modes"""
NORMAL = "normal" # Regular cleanup
AGGRESSIVE = "aggressive" # More aggressive cleanup
MAINTENANCE = "maintenance" # Maintenance mode cleanup
EMERGENCY = "emergency" # Emergency cleanup
class CleanupPhase(Enum):
"""Cleanup operation phases"""
HISTORY = "history"
TRACKING = "tracking"
GUILD = "guild"
VERIFICATION = "verification"
@dataclass
class CleanupConfig:
"""Configuration for cleanup operations"""
cleanup_interval: int = 1800 # 30 minutes
max_history_age: int = 43200 # 12 hours
batch_size: int = 100
max_concurrent_cleanups: int = 3
verification_interval: int = 300 # 5 minutes
emergency_threshold: int = 10000 # Items threshold for emergency
@dataclass
class CleanupResult:
"""Result of a cleanup operation"""
timestamp: datetime
mode: CleanupMode
duration: float
items_cleaned: Dict[CleanupPhase, int]
error: Optional[str] = None
class CleanupScheduler:
"""Schedules cleanup operations"""
def __init__(self, config: CleanupConfig):
self.config = config
self.next_cleanup: Optional[datetime] = None
self.next_verification: Optional[datetime] = None
self._last_emergency: Optional[datetime] = None
def should_cleanup(self, queue_size: int) -> Tuple[bool, CleanupMode]:
"""Determine if cleanup should run"""
now = datetime.utcnow()
# Check for emergency cleanup
if (
queue_size > self.config.emergency_threshold and
(
not self._last_emergency or
now - self._last_emergency > timedelta(minutes=5)
)
):
self._last_emergency = now
return True, CleanupMode.EMERGENCY
# Check scheduled cleanup
if not self.next_cleanup or now >= self.next_cleanup:
self.next_cleanup = now + timedelta(
seconds=self.config.cleanup_interval
)
return True, CleanupMode.NORMAL
# Check verification
if not self.next_verification or now >= self.next_verification:
self.next_verification = now + timedelta(
seconds=self.config.verification_interval
)
return True, CleanupMode.MAINTENANCE
return False, CleanupMode.NORMAL
class CleanupCoordinator:
"""Coordinates cleanup operations"""
def __init__(self):
self.active_cleanups: Set[CleanupPhase] = set()
self._cleanup_lock = asyncio.Lock()
self._phase_locks: Dict[CleanupPhase, asyncio.Lock] = {
phase: asyncio.Lock() for phase in CleanupPhase
}
async def start_cleanup(self, phase: CleanupPhase) -> bool:
"""Start a cleanup phase"""
async with self._cleanup_lock:
if phase in self.active_cleanups:
return False
self.active_cleanups.add(phase)
return True
async def end_cleanup(self, phase: CleanupPhase) -> None:
"""End a cleanup phase"""
async with self._cleanup_lock:
self.active_cleanups.discard(phase)
async def acquire_phase(self, phase: CleanupPhase) -> bool:
"""Acquire lock for a cleanup phase"""
return await self._phase_locks[phase].acquire()
def release_phase(self, phase: CleanupPhase) -> None:
"""Release lock for a cleanup phase"""
self._phase_locks[phase].release()
class CleanupTracker:
"""Tracks cleanup operations"""
def __init__(self, max_history: int = 1000):
self.max_history = max_history
self.history: List[CleanupResult] = []
self.total_items_cleaned = 0
self.last_cleanup: Optional[datetime] = None
self.cleanup_counts: Dict[CleanupMode, int] = {
mode: 0 for mode in CleanupMode
}
def record_cleanup(self, result: CleanupResult) -> None:
"""Record a cleanup operation"""
self.history.append(result)
if len(self.history) > self.max_history:
self.history.pop(0)
self.total_items_cleaned += sum(result.items_cleaned.values())
self.last_cleanup = result.timestamp
self.cleanup_counts[result.mode] += 1
def get_stats(self) -> Dict[str, Any]:
"""Get cleanup statistics"""
return {
"total_cleanups": len(self.history),
"total_items_cleaned": self.total_items_cleaned,
"last_cleanup": (
self.last_cleanup.isoformat()
if self.last_cleanup
else None
),
"cleanup_counts": {
mode.value: count
for mode, count in self.cleanup_counts.items()
},
"recent_cleanups": [
{
"timestamp": r.timestamp.isoformat(),
"mode": r.mode.value,
"duration": r.duration,
"items_cleaned": {
phase.value: count
for phase, count in r.items_cleaned.items()
}
}
for r in self.history[-5:] # Last 5 cleanups
]
}
class QueueCleaner:
"""Handles cleanup of queue items and tracking data"""
def __init__(self, config: Optional[CleanupConfig] = None):
self.config = config or CleanupConfig()
self.scheduler = CleanupScheduler(self.config)
self.coordinator = CleanupCoordinator()
self.tracker = CleanupTracker()
# Initialize cleaners
self.history_cleaner = HistoryCleaner()
self.guild_cleaner = GuildCleaner()
self.tracking_cleaner = TrackingCleaner()
self._shutdown = False
self._cleanup_task: Optional[asyncio.Task] = None
async def start(
self,
state_manager,
metrics_manager
) -> None:
"""Start periodic cleanup process"""
if self._cleanup_task is not None:
logger.warning("Cleanup task already running")
return
logger.info("Starting queue cleanup task...")
self._cleanup_task = asyncio.create_task(
self._cleanup_loop(state_manager, metrics_manager)
)
async def _cleanup_loop(
self,
state_manager,
metrics_manager
) -> None:
"""Main cleanup loop"""
while not self._shutdown:
try:
# Check if cleanup should run
queue_size = len(await state_manager.get_queue())
should_run, mode = self.scheduler.should_cleanup(queue_size)
if should_run:
await self._perform_cleanup(
state_manager,
metrics_manager,
mode
)
await asyncio.sleep(1) # Short sleep to prevent CPU hogging
except asyncio.CancelledError:
logger.info("Queue cleanup cancelled")
break
except Exception as e:
logger.error(f"Error in cleanup loop: {str(e)}")
await asyncio.sleep(30) # Longer sleep on error
async def stop(self) -> None:
"""Stop the cleanup process"""
logger.info("Stopping queue cleanup...")
self._shutdown = True
if self._cleanup_task and not self._cleanup_task.done():
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
self._cleanup_task = None
async def _perform_cleanup(
self,
state_manager,
metrics_manager,
mode: CleanupMode
) -> None:
"""Perform cleanup operations"""
start_time = datetime.utcnow()
items_cleaned: Dict[CleanupPhase, int] = {
phase: 0 for phase in CleanupPhase
}
try:
# Get current state
queue = await state_manager.get_queue()
processing = await state_manager.get_processing()
completed = await state_manager.get_completed()
failed = await state_manager.get_failed()
guild_queues = await state_manager.get_guild_queues()
channel_queues = await state_manager.get_channel_queues()
# Clean historical items
if await self.coordinator.start_cleanup(CleanupPhase.HISTORY):
try:
await self.coordinator.acquire_phase(CleanupPhase.HISTORY)
cleanup_cutoff = self.history_cleaner.get_cleanup_cutoff()
# Adjust strategy based on mode
if mode == CleanupMode.AGGRESSIVE:
self.history_cleaner.strategy = HistoryStrategy.AGGRESSIVE
elif mode == CleanupMode.MAINTENANCE:
self.history_cleaner.strategy = HistoryStrategy.CONSERVATIVE
completed_cleaned = await self.history_cleaner.cleanup_completed(
completed,
cleanup_cutoff
)
failed_cleaned = await self.history_cleaner.cleanup_failed(
failed,
cleanup_cutoff
)
items_cleaned[CleanupPhase.HISTORY] = (
completed_cleaned + failed_cleaned
)
finally:
self.coordinator.release_phase(CleanupPhase.HISTORY)
await self.coordinator.end_cleanup(CleanupPhase.HISTORY)
# Clean tracking data
if await self.coordinator.start_cleanup(CleanupPhase.TRACKING):
try:
await self.coordinator.acquire_phase(CleanupPhase.TRACKING)
# Adjust strategy based on mode
if mode == CleanupMode.AGGRESSIVE:
self.tracking_cleaner.strategy = TrackingCleanupStrategy.AGGRESSIVE
elif mode == CleanupMode.MAINTENANCE:
self.tracking_cleaner.strategy = TrackingCleanupStrategy.CONSERVATIVE
tracking_cleaned, _ = await self.tracking_cleaner.cleanup_tracking(
guild_queues,
channel_queues,
queue,
processing
)
items_cleaned[CleanupPhase.TRACKING] = tracking_cleaned
finally:
self.coordinator.release_phase(CleanupPhase.TRACKING)
await self.coordinator.end_cleanup(CleanupPhase.TRACKING)
# Update state
await state_manager.update_state(
completed=completed,
failed=failed,
guild_queues=guild_queues,
channel_queues=channel_queues
)
# Record cleanup result
duration = (datetime.utcnow() - start_time).total_seconds()
result = CleanupResult(
timestamp=datetime.utcnow(),
mode=mode,
duration=duration,
items_cleaned=items_cleaned
)
self.tracker.record_cleanup(result)
# Update metrics
metrics_manager.update_cleanup_time()
logger.info(
f"Cleanup completed ({mode.value}):\n" +
"\n".join(
f"- {phase.value}: {count} items"
for phase, count in items_cleaned.items()
if count > 0
) +
f"\nTotal duration: {duration:.2f}s"
)
except Exception as e:
logger.error(f"Error during cleanup: {str(e)}")
duration = (datetime.utcnow() - start_time).total_seconds()
self.tracker.record_cleanup(CleanupResult(
timestamp=datetime.utcnow(),
mode=mode,
duration=duration,
items_cleaned=items_cleaned,
error=str(e)
))
raise CleanupError(f"Cleanup failed: {str(e)}")
async def clear_guild_queue(
self,
guild_id: int,
state_manager
) -> int:
"""Clear all queue items for a specific guild"""
try:
if not await self.coordinator.start_cleanup(CleanupPhase.GUILD):
raise CleanupError("Guild cleanup already in progress")
try:
await self.coordinator.acquire_phase(CleanupPhase.GUILD)
# Get current state
queue = await state_manager.get_queue()
processing = await state_manager.get_processing()
completed = await state_manager.get_completed()
failed = await state_manager.get_failed()
guild_queues = await state_manager.get_guild_queues()
channel_queues = await state_manager.get_channel_queues()
# Clear guild items
cleared_count, counts = await self.guild_cleaner.clear_guild_items(
guild_id,
queue,
processing,
completed,
failed,
guild_queues,
channel_queues
)
# Update state
await state_manager.update_state(
queue=queue,
processing=processing,
completed=completed,
failed=failed,
guild_queues=guild_queues,
channel_queues=channel_queues
)
return cleared_count
finally:
self.coordinator.release_phase(CleanupPhase.GUILD)
await self.coordinator.end_cleanup(CleanupPhase.GUILD)
except Exception as e:
logger.error(f"Error clearing guild queue: {str(e)}")
raise CleanupError(f"Failed to clear guild queue: {str(e)}")
def get_cleaner_stats(self) -> Dict[str, Any]:
"""Get comprehensive cleaner statistics"""
return {
"config": {
"cleanup_interval": self.config.cleanup_interval,
"max_history_age": self.config.max_history_age,
"batch_size": self.config.batch_size,
"max_concurrent_cleanups": self.config.max_concurrent_cleanups,
"verification_interval": self.config.verification_interval,
"emergency_threshold": self.config.emergency_threshold
},
"scheduler": {
"next_cleanup": (
self.scheduler.next_cleanup.isoformat()
if self.scheduler.next_cleanup
else None
),
"next_verification": (
self.scheduler.next_verification.isoformat()
if self.scheduler.next_verification
else None
),
"last_emergency": (
self.scheduler._last_emergency.isoformat()
if self.scheduler._last_emergency
else None
)
},
"coordinator": {
"active_cleanups": [
phase.value for phase in self.coordinator.active_cleanups
]
},
"tracker": self.tracker.get_stats(),
"cleaners": {
"history": self.history_cleaner.get_cleaner_stats(),
"guild": self.guild_cleaner.get_cleaner_stats(),
"tracking": self.tracking_cleaner.get_cleaner_stats()
}
}
class CleanupError(Exception):
"""Base exception for cleanup-related errors"""
pass