Refactor queue system into modular structure

- Created new queue module with separate components:
  - models.py: QueueItem and QueueMetrics data classes
  - persistence.py: Queue state persistence
  - monitoring.py: Health monitoring and metrics
  - cleanup.py: Cleanup operations
  - manager.py: Main queue management
  - __init__.py: Package exports

- Updated imports in video_archiver.py and processor.py
- Removed old enhanced_queue.py
- Updated README with new queue system details

This refactoring improves code organization and maintainability
through better separation of concerns while maintaining all
existing functionality.
This commit is contained in:
pacnpal
2024-11-15 18:16:53 +00:00
parent 44599b2b22
commit b1eafbb01d
9 changed files with 1242 additions and 4 deletions

View File

@@ -0,0 +1,19 @@
"""Queue management package for video processing"""
from .models import QueueItem, QueueMetrics
from .manager import EnhancedVideoQueueManager
from .persistence import QueuePersistenceManager, QueueError
from .monitoring import QueueMonitor, MonitoringError
from .cleanup import QueueCleaner, CleanupError
__all__ = [
'QueueItem',
'QueueMetrics',
'EnhancedVideoQueueManager',
'QueuePersistenceManager',
'QueueMonitor',
'QueueCleaner',
'QueueError',
'MonitoringError',
'CleanupError',
]

View File

@@ -0,0 +1,243 @@
"""Queue cleanup operations"""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Set
from .models import QueueItem, QueueMetrics
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("QueueCleanup")
class QueueCleaner:
"""Handles cleanup of old queue items and tracking data"""
def __init__(
self,
cleanup_interval: int = 3600, # 1 hour
max_history_age: int = 86400, # 24 hours
):
self.cleanup_interval = cleanup_interval
self.max_history_age = max_history_age
self._shutdown = False
async def start_cleanup(
self,
queue: List[QueueItem],
completed: Dict[str, QueueItem],
failed: Dict[str, QueueItem],
guild_queues: Dict[int, Set[str]],
channel_queues: Dict[int, Set[str]],
processing: Dict[str, QueueItem],
metrics: QueueMetrics,
queue_lock: asyncio.Lock
) -> None:
"""Start periodic cleanup process
Args:
queue: Reference to the queue list
completed: Reference to completed items dict
failed: Reference to failed items dict
guild_queues: Reference to guild tracking dict
channel_queues: Reference to channel tracking dict
processing: Reference to processing dict
metrics: Reference to queue metrics
queue_lock: Lock for queue operations
"""
while not self._shutdown:
try:
await self._perform_cleanup(
queue,
completed,
failed,
guild_queues,
channel_queues,
processing,
metrics,
queue_lock
)
await asyncio.sleep(self.cleanup_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in periodic cleanup: {str(e)}")
await asyncio.sleep(60)
def stop_cleanup(self) -> None:
"""Stop the cleanup process"""
self._shutdown = True
async def _perform_cleanup(
self,
queue: List[QueueItem],
completed: Dict[str, QueueItem],
failed: Dict[str, QueueItem],
guild_queues: Dict[int, Set[str]],
channel_queues: Dict[int, Set[str]],
processing: Dict[str, QueueItem],
metrics: QueueMetrics,
queue_lock: asyncio.Lock
) -> None:
"""Perform cleanup operations
Args:
queue: Reference to the queue list
completed: Reference to completed items dict
failed: Reference to failed items dict
guild_queues: Reference to guild tracking dict
channel_queues: Reference to channel tracking dict
processing: Reference to processing dict
metrics: Reference to queue metrics
queue_lock: Lock for queue operations
"""
try:
current_time = datetime.utcnow()
cleanup_cutoff = current_time - timedelta(seconds=self.max_history_age)
async with queue_lock:
# Clean up completed items
for url in list(completed.keys()):
try:
item = completed[url]
# Ensure added_at is a datetime object
if not isinstance(item.added_at, datetime):
try:
if isinstance(item.added_at, str):
item.added_at = datetime.fromisoformat(item.added_at)
else:
item.added_at = current_time
except (ValueError, TypeError):
item.added_at = current_time
if item.added_at < cleanup_cutoff:
completed.pop(url)
except Exception as e:
logger.error(f"Error processing completed item {url}: {e}")
completed.pop(url)
# Clean up failed items
for url in list(failed.keys()):
try:
item = failed[url]
# Ensure added_at is a datetime object
if not isinstance(item.added_at, datetime):
try:
if isinstance(item.added_at, str):
item.added_at = datetime.fromisoformat(item.added_at)
else:
item.added_at = current_time
except (ValueError, TypeError):
item.added_at = current_time
if item.added_at < cleanup_cutoff:
failed.pop(url)
except Exception as e:
logger.error(f"Error processing failed item {url}: {e}")
failed.pop(url)
# Clean up guild tracking
for guild_id in list(guild_queues.keys()):
guild_queues[guild_id] = {
url for url in guild_queues[guild_id]
if url in queue or url in processing
}
if not guild_queues[guild_id]:
guild_queues.pop(guild_id)
# Clean up channel tracking
for channel_id in list(channel_queues.keys()):
channel_queues[channel_id] = {
url for url in channel_queues[channel_id]
if url in queue or url in processing
}
if not channel_queues[channel_id]:
channel_queues.pop(channel_id)
metrics.last_cleanup = current_time
logger.info("Completed periodic queue cleanup")
except Exception as e:
logger.error(f"Error during cleanup: {str(e)}")
raise
async def clear_guild_queue(
self,
guild_id: int,
queue: List[QueueItem],
processing: Dict[str, QueueItem],
completed: Dict[str, QueueItem],
failed: Dict[str, QueueItem],
guild_queues: Dict[int, Set[str]],
channel_queues: Dict[int, Set[str]],
queue_lock: asyncio.Lock
) -> int:
"""Clear all queue items for a specific guild
Args:
guild_id: ID of the guild to clear
queue: Reference to the queue list
processing: Reference to processing dict
completed: Reference to completed items dict
failed: Reference to failed items dict
guild_queues: Reference to guild tracking dict
channel_queues: Reference to channel tracking dict
queue_lock: Lock for queue operations
Returns:
Number of items cleared
"""
try:
cleared_count = 0
async with queue_lock:
# Get URLs for this guild
guild_urls = guild_queues.get(guild_id, set())
# Clear from pending queue
queue[:] = [item for item in queue if item.guild_id != guild_id]
# Clear from processing
for url in list(processing.keys()):
if processing[url].guild_id == guild_id:
processing.pop(url)
cleared_count += 1
# Clear from completed
for url in list(completed.keys()):
if completed[url].guild_id == guild_id:
completed.pop(url)
cleared_count += 1
# Clear from failed
for url in list(failed.keys()):
if failed[url].guild_id == guild_id:
failed.pop(url)
cleared_count += 1
# Clear guild tracking
if guild_id in guild_queues:
cleared_count += len(guild_queues[guild_id])
guild_queues.pop(guild_id)
# Clear channel tracking for this guild's channels
for channel_id in list(channel_queues.keys()):
channel_queues[channel_id] = {
url for url in channel_queues[channel_id]
if url not in guild_urls
}
if not channel_queues[channel_id]:
channel_queues.pop(channel_id)
logger.info(f"Cleared {cleared_count} items from guild {guild_id} queue")
return cleared_count
except Exception as e:
logger.error(f"Error clearing guild queue: {str(e)}")
raise
class CleanupError(Exception):
"""Base exception for cleanup-related errors"""
pass

View File

@@ -0,0 +1,453 @@
"""Enhanced queue manager for video processing"""
import asyncio
import logging
from typing import Dict, Optional, Set, Tuple, Callable, Any, List
from datetime import datetime
from .models import QueueItem, QueueMetrics
from .persistence import QueuePersistenceManager, QueueError
from .monitoring import QueueMonitor, MonitoringError
from .cleanup import QueueCleaner, CleanupError
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("QueueManager")
class EnhancedVideoQueueManager:
"""Enhanced queue manager with improved memory management and performance"""
def __init__(
self,
max_retries: int = 3,
retry_delay: int = 5,
max_queue_size: int = 1000,
cleanup_interval: int = 3600, # 1 hour
max_history_age: int = 86400, # 24 hours
persistence_path: Optional[str] = None,
backup_interval: int = 300, # 5 minutes
deadlock_threshold: int = 900, # 15 minutes
):
# Configuration
self.max_retries = max_retries
self.retry_delay = retry_delay
self.max_queue_size = max_queue_size
# Queue storage
self._queue: List[QueueItem] = []
self._processing: Dict[str, QueueItem] = {}
self._completed: Dict[str, QueueItem] = {}
self._failed: Dict[str, QueueItem] = {}
# Tracking
self._guild_queues: Dict[int, Set[str]] = {}
self._channel_queues: Dict[int, Set[str]] = {}
self._active_tasks: Set[asyncio.Task] = set()
# Locks
self._queue_lock = asyncio.Lock()
self._processing_lock = asyncio.Lock()
# State
self._shutdown = False
self.metrics = QueueMetrics()
# Components
self.persistence = QueuePersistenceManager(persistence_path) if persistence_path else None
self.monitor = QueueMonitor(
deadlock_threshold=deadlock_threshold,
max_retries=max_retries
)
self.cleaner = QueueCleaner(
cleanup_interval=cleanup_interval,
max_history_age=max_history_age
)
# Initialize tasks
self._init_tasks()
def _init_tasks(self) -> None:
"""Initialize background tasks"""
# Start monitoring
monitor_task = asyncio.create_task(
self.monitor.start_monitoring(
self._queue,
self._processing,
self.metrics,
self._processing_lock
)
)
self._active_tasks.add(monitor_task)
# Start cleanup
cleanup_task = asyncio.create_task(
self.cleaner.start_cleanup(
self._queue,
self._completed,
self._failed,
self._guild_queues,
self._channel_queues,
self._processing,
self.metrics,
self._queue_lock
)
)
self._active_tasks.add(cleanup_task)
# Load persisted state if available
if self.persistence:
self._load_persisted_state()
def _load_persisted_state(self) -> None:
"""Load persisted queue state"""
try:
state = self.persistence.load_queue_state()
if state:
self._queue = state["queue"]
self._processing = state["processing"]
self._completed = state["completed"]
self._failed = state["failed"]
# Update metrics
metrics_data = state.get("metrics", {})
self.metrics.total_processed = metrics_data.get("total_processed", 0)
self.metrics.total_failed = metrics_data.get("total_failed", 0)
self.metrics.avg_processing_time = metrics_data.get("avg_processing_time", 0.0)
self.metrics.success_rate = metrics_data.get("success_rate", 0.0)
self.metrics.errors_by_type = metrics_data.get("errors_by_type", {})
self.metrics.compression_failures = metrics_data.get("compression_failures", 0)
self.metrics.hardware_accel_failures = metrics_data.get("hardware_accel_failures", 0)
except Exception as e:
logger.error(f"Failed to load persisted state: {e}")
async def process_queue(
self,
processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]
) -> None:
"""Process items in the queue
Args:
processor: Function that processes queue items
"""
logger.info("Queue processor started")
while not self._shutdown:
try:
# Get next item from queue
item = None
async with self._queue_lock:
if self._queue:
item = self._queue.pop(0)
self._processing[item.url] = item
item.status = "processing"
item.processing_time = 0.0
if not item:
await asyncio.sleep(1)
continue
try:
# Process the item
logger.info(f"Processing queue item: {item.url}")
success, error = await processor(item)
# Update metrics and status
async with self._processing_lock:
if success:
item.status = "completed"
self._completed[item.url] = item
logger.info(f"Successfully processed: {item.url}")
else:
item.status = "failed"
item.error = error
item.last_error = error
item.last_error_time = datetime.utcnow()
if item.retry_count < self.max_retries:
item.retry_count += 1
item.status = "pending"
item.last_retry = datetime.utcnow()
item.priority = max(0, item.priority - 1)
self._queue.append(item)
logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})")
else:
self._failed[item.url] = item
logger.error(f"Failed after {self.max_retries} attempts: {item.url}")
self._processing.pop(item.url, None)
except Exception as e:
logger.error(f"Error processing {item.url}: {e}")
async with self._processing_lock:
item.status = "failed"
item.error = str(e)
item.last_error = str(e)
item.last_error_time = datetime.utcnow()
self._failed[item.url] = item
self._processing.pop(item.url, None)
# Persist state if enabled
if self.persistence:
await self.persistence.persist_queue_state(
self._queue,
self._processing,
self._completed,
self._failed,
self.metrics
)
except Exception as e:
logger.error(f"Critical error in queue processor: {e}")
await asyncio.sleep(1)
await asyncio.sleep(0.1)
logger.info("Queue processor stopped")
async def add_to_queue(
self,
url: str,
message_id: int,
channel_id: int,
guild_id: int,
author_id: int,
priority: int = 0,
) -> bool:
"""Add a video to the processing queue
Args:
url: Video URL
message_id: Discord message ID
channel_id: Discord channel ID
guild_id: Discord guild ID
author_id: Discord author ID
priority: Queue priority (higher = higher priority)
Returns:
True if added successfully
Raises:
QueueError: If queue is full or shutting down
"""
if self._shutdown:
raise QueueError("Queue manager is shutting down")
try:
async with self._queue_lock:
if len(self._queue) >= self.max_queue_size:
raise QueueError("Queue is full")
item = QueueItem(
url=url,
message_id=message_id,
channel_id=channel_id,
guild_id=guild_id,
author_id=author_id,
added_at=datetime.utcnow(),
priority=priority,
)
# Add to tracking
if guild_id not in self._guild_queues:
self._guild_queues[guild_id] = set()
self._guild_queues[guild_id].add(url)
if channel_id not in self._channel_queues:
self._channel_queues[channel_id] = set()
self._channel_queues[channel_id].add(url)
# Add to queue with priority
self._queue.append(item)
self._queue.sort(key=lambda x: (-x.priority, x.added_at))
if self.persistence:
await self.persistence.persist_queue_state(
self._queue,
self._processing,
self._completed,
self._failed,
self.metrics
)
logger.info(f"Added to queue: {url} (priority: {priority})")
return True
except Exception as e:
logger.error(f"Error adding to queue: {e}")
raise QueueError(f"Failed to add to queue: {str(e)}")
def get_queue_status(self, guild_id: int) -> dict:
"""Get current queue status for a guild
Args:
guild_id: Discord guild ID
Returns:
Dict containing queue status and metrics
"""
try:
pending = len([item for item in self._queue if item.guild_id == guild_id])
processing = len([item for item in self._processing.values() if item.guild_id == guild_id])
completed = len([item for item in self._completed.values() if item.guild_id == guild_id])
failed = len([item for item in self._failed.values() if item.guild_id == guild_id])
return {
"pending": pending,
"processing": processing,
"completed": completed,
"failed": failed,
"metrics": {
"total_processed": self.metrics.total_processed,
"total_failed": self.metrics.total_failed,
"success_rate": self.metrics.success_rate,
"avg_processing_time": self.metrics.avg_processing_time,
"peak_memory_usage": self.metrics.peak_memory_usage,
"last_cleanup": self.metrics.last_cleanup.strftime("%Y-%m-%d %H:%M:%S"),
"errors_by_type": self.metrics.errors_by_type,
"compression_failures": self.metrics.compression_failures,
"hardware_accel_failures": self.metrics.hardware_accel_failures,
},
}
except Exception as e:
logger.error(f"Error getting queue status: {e}")
return {
"pending": 0,
"processing": 0,
"completed": 0,
"failed": 0,
"metrics": {
"total_processed": 0,
"total_failed": 0,
"success_rate": 0.0,
"avg_processing_time": 0.0,
"peak_memory_usage": 0.0,
"last_cleanup": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"errors_by_type": {},
"compression_failures": 0,
"hardware_accel_failures": 0,
},
}
async def clear_guild_queue(self, guild_id: int) -> int:
"""Clear all queue items for a guild
Args:
guild_id: Discord guild ID
Returns:
Number of items cleared
Raises:
QueueError: If queue is shutting down
"""
if self._shutdown:
raise QueueError("Queue manager is shutting down")
try:
cleared = await self.cleaner.clear_guild_queue(
guild_id,
self._queue,
self._processing,
self._completed,
self._failed,
self._guild_queues,
self._channel_queues,
self._queue_lock
)
if self.persistence:
await self.persistence.persist_queue_state(
self._queue,
self._processing,
self._completed,
self._failed,
self.metrics
)
return cleared
except Exception as e:
logger.error(f"Error clearing guild queue: {e}")
raise QueueError(f"Failed to clear guild queue: {str(e)}")
async def cleanup(self) -> None:
"""Clean up resources and stop queue processing"""
try:
self._shutdown = True
# Stop monitoring and cleanup tasks
self.monitor.stop_monitoring()
self.cleaner.stop_cleanup()
# Cancel all active tasks
for task in self._active_tasks:
if not task.done():
task.cancel()
await asyncio.gather(*self._active_tasks, return_exceptions=True)
# Move processing items back to queue
async with self._queue_lock:
for url, item in self._processing.items():
if item.retry_count < self.max_retries:
item.status = "pending"
item.retry_count += 1
self._queue.append(item)
else:
self._failed[url] = item
self._processing.clear()
# Final state persistence
if self.persistence:
await self.persistence.persist_queue_state(
self._queue,
self._processing,
self._completed,
self._failed,
self.metrics
)
# Clear collections
self._queue.clear()
self._completed.clear()
self._failed.clear()
self._guild_queues.clear()
self._channel_queues.clear()
self._active_tasks.clear()
logger.info("Queue manager cleanup completed")
except Exception as e:
logger.error(f"Error during cleanup: {e}")
raise CleanupError(f"Failed to clean up queue manager: {str(e)}")
def force_stop(self) -> None:
"""Force stop all queue operations immediately"""
self._shutdown = True
# Stop monitoring and cleanup
self.monitor.stop_monitoring()
self.cleaner.stop_cleanup()
# Cancel all active tasks
for task in self._active_tasks:
if not task.done():
task.cancel()
# Move processing items back to queue
for url, item in self._processing.items():
if item.retry_count < self.max_retries:
item.status = "pending"
item.retry_count += 1
self._queue.append(item)
else:
self._failed[url] = item
self._processing.clear()
self._active_tasks.clear()
logger.info("Queue manager force stopped")

View File

@@ -0,0 +1,150 @@
"""Data models for the queue system"""
import logging
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Dict, Optional, List, Any
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("QueueModels")
@dataclass
class QueueMetrics:
"""Metrics tracking for queue performance and health"""
total_processed: int = 0
total_failed: int = 0
avg_processing_time: float = 0.0
success_rate: float = 0.0
errors_by_type: Dict[str, int] = field(default_factory=dict)
last_error: Optional[str] = None
last_error_time: Optional[datetime] = None
last_cleanup: datetime = field(default_factory=datetime.utcnow)
retries: int = 0
peak_memory_usage: float = 0.0
processing_times: List[float] = field(default_factory=list)
compression_failures: int = 0
hardware_accel_failures: int = 0
def update(self, processing_time: float, success: bool, error: str = None):
"""Update metrics with new processing information"""
self.total_processed += 1
if not success:
self.total_failed += 1
if error:
self.last_error = error
self.last_error_time = datetime.utcnow()
error_type = error.split(":")[0] if ":" in error else error
self.errors_by_type[error_type] = (
self.errors_by_type.get(error_type, 0) + 1
)
# Track specific error types
if "compression error" in error.lower():
self.compression_failures += 1
elif "hardware acceleration failed" in error.lower():
self.hardware_accel_failures += 1
# Update processing times with sliding window
self.processing_times.append(processing_time)
if len(self.processing_times) > 100: # Keep last 100 processing times
self.processing_times.pop(0)
# Update average processing time
self.avg_processing_time = (
sum(self.processing_times) / len(self.processing_times)
if self.processing_times
else 0.0
)
# Update success rate
self.success_rate = (
(self.total_processed - self.total_failed) / self.total_processed
if self.total_processed > 0
else 0.0
)
@dataclass
class QueueItem:
"""Represents a video processing task in the queue"""
url: str
message_id: int
channel_id: int
guild_id: int
author_id: int
added_at: datetime
priority: int = 0 # Higher number = higher priority
status: str = "pending" # pending, processing, completed, failed
error: Optional[str] = None
attempt: int = 0
_processing_time: float = 0.0 # Use private field for processing_time
size_bytes: int = 0
last_error: Optional[str] = None
retry_count: int = 0
last_retry: Optional[datetime] = None
processing_times: List[float] = field(default_factory=list)
last_error_time: Optional[datetime] = None
hardware_accel_attempted: bool = False
compression_attempted: bool = False
original_message: Optional[Any] = None # Store the original message reference
@property
def processing_time(self) -> float:
"""Get processing time as float"""
return self._processing_time
@processing_time.setter
def processing_time(self, value: Any) -> None:
"""Set processing time, ensuring it's always a float"""
try:
if isinstance(value, str):
self._processing_time = float(value)
elif isinstance(value, (int, float)):
self._processing_time = float(value)
else:
self._processing_time = 0.0
except (ValueError, TypeError):
self._processing_time = 0.0
def to_dict(self) -> dict:
"""Convert to dictionary with datetime handling"""
data = asdict(self)
# Convert datetime objects to ISO format strings
if self.added_at:
data['added_at'] = self.added_at.isoformat()
if self.last_retry:
data['last_retry'] = self.last_retry.isoformat()
if self.last_error_time:
data['last_error_time'] = self.last_error_time.isoformat()
# Convert _processing_time to processing_time in dict
data['processing_time'] = self.processing_time
data.pop('_processing_time', None)
return data
@classmethod
def from_dict(cls, data: dict) -> 'QueueItem':
"""Create from dictionary with datetime handling"""
# Convert ISO format strings back to datetime objects
if 'added_at' in data and isinstance(data['added_at'], str):
data['added_at'] = datetime.fromisoformat(data['added_at'])
if 'last_retry' in data and isinstance(data['last_retry'], str):
data['last_retry'] = datetime.fromisoformat(data['last_retry'])
if 'last_error_time' in data and isinstance(data['last_error_time'], str):
data['last_error_time'] = datetime.fromisoformat(data['last_error_time'])
# Handle processing_time conversion
if 'processing_time' in data:
try:
if isinstance(data['processing_time'], str):
data['_processing_time'] = float(data['processing_time'])
elif isinstance(data['processing_time'], (int, float)):
data['_processing_time'] = float(data['processing_time'])
else:
data['_processing_time'] = 0.0
except (ValueError, TypeError):
data['_processing_time'] = 0.0
data.pop('processing_time', None)
return cls(**data)

View File

@@ -0,0 +1,172 @@
"""Queue monitoring and health checks"""
import asyncio
import logging
import psutil
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
from .models import QueueItem, QueueMetrics
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("QueueMonitoring")
class QueueMonitor:
"""Monitors queue health and performance"""
def __init__(
self,
deadlock_threshold: int = 900, # 15 minutes
memory_threshold: int = 1024, # 1GB
max_retries: int = 3
):
self.deadlock_threshold = deadlock_threshold
self.memory_threshold = memory_threshold
self.max_retries = max_retries
self._shutdown = False
async def start_monitoring(
self,
queue: List[QueueItem],
processing: Dict[str, QueueItem],
metrics: QueueMetrics,
processing_lock: asyncio.Lock
) -> None:
"""Start monitoring queue health
Args:
queue: Reference to the queue list
processing: Reference to processing dict
metrics: Reference to queue metrics
processing_lock: Lock for processing dict
"""
while not self._shutdown:
try:
await self._check_health(queue, processing, metrics, processing_lock)
await asyncio.sleep(300) # Check every 5 minutes
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in health monitor: {str(e)}")
await asyncio.sleep(60)
def stop_monitoring(self) -> None:
"""Stop the monitoring process"""
self._shutdown = True
async def _check_health(
self,
queue: List[QueueItem],
processing: Dict[str, QueueItem],
metrics: QueueMetrics,
processing_lock: asyncio.Lock
) -> None:
"""Check queue health and performance
Args:
queue: Reference to the queue list
processing: Reference to processing dict
metrics: Reference to queue metrics
processing_lock: Lock for processing dict
"""
try:
# Check memory usage
process = psutil.Process()
memory_usage = process.memory_info().rss / 1024 / 1024 # MB
if memory_usage > self.memory_threshold:
logger.warning(f"High memory usage detected: {memory_usage:.2f}MB")
# Force garbage collection
import gc
gc.collect()
# Check for potential deadlocks
current_time = time.time()
processing_times = []
stuck_items = []
for url, item in processing.items():
if isinstance(item.processing_time, (int, float)) and item.processing_time > 0:
processing_time = current_time - item.processing_time
processing_times.append(processing_time)
if processing_time > self.deadlock_threshold:
stuck_items.append((url, item))
if stuck_items:
logger.warning(
f"Potential deadlock detected: {len(stuck_items)} items stuck"
)
await self._recover_stuck_items(
stuck_items, queue, processing, processing_lock
)
# Calculate and log metrics
success_rate = metrics.success_rate
error_distribution = metrics.errors_by_type
avg_processing_time = metrics.avg_processing_time
# Update peak memory usage
metrics.peak_memory_usage = max(metrics.peak_memory_usage, memory_usage)
logger.info(
f"Queue Health Metrics:\n"
f"- Success Rate: {success_rate:.2%}\n"
f"- Avg Processing Time: {avg_processing_time:.2f}s\n"
f"- Memory Usage: {memory_usage:.2f}MB\n"
f"- Error Distribution: {error_distribution}\n"
f"- Queue Size: {len(queue)}\n"
f"- Processing Items: {len(processing)}"
)
except Exception as e:
logger.error(f"Error checking queue health: {str(e)}")
raise
async def _recover_stuck_items(
self,
stuck_items: List[tuple[str, QueueItem]],
queue: List[QueueItem],
processing: Dict[str, QueueItem],
processing_lock: asyncio.Lock
) -> None:
"""Attempt to recover stuck items
Args:
stuck_items: List of (url, item) tuples for stuck items
queue: Reference to the queue list
processing: Reference to processing dict
processing_lock: Lock for processing dict
"""
try:
async with processing_lock:
for url, item in stuck_items:
# Move to failed if max retries reached
if item.retry_count >= self.max_retries:
logger.warning(f"Moving stuck item to failed: {url}")
item.status = "failed"
item.error = "Exceeded maximum retries after being stuck"
item.last_error = item.error
item.last_error_time = datetime.utcnow()
processing.pop(url)
else:
# Reset for retry
logger.info(f"Recovering stuck item for retry: {url}")
item.retry_count += 1
item.processing_time = 0
item.last_retry = datetime.utcnow()
item.status = "pending"
item.priority = max(0, item.priority - 2) # Lower priority
queue.append(item)
processing.pop(url)
except Exception as e:
logger.error(f"Error recovering stuck items: {str(e)}")
raise
class MonitoringError(Exception):
"""Base exception for monitoring-related errors"""
pass

View File

@@ -0,0 +1,201 @@
"""Queue persistence management"""
import json
import logging
import os
import time
from datetime import datetime
from typing import Dict, Any, Optional
from .models import QueueItem, QueueMetrics
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("QueuePersistence")
class QueuePersistenceManager:
"""Manages persistence of queue state to disk"""
def __init__(self, persistence_path: str):
"""Initialize the persistence manager
Args:
persistence_path: Path to the persistence file
"""
self.persistence_path = persistence_path
async def persist_queue_state(
self,
queue: list[QueueItem],
processing: Dict[str, QueueItem],
completed: Dict[str, QueueItem],
failed: Dict[str, QueueItem],
metrics: QueueMetrics
) -> None:
"""Persist queue state to disk with improved error handling
Args:
queue: List of pending queue items
processing: Dict of items currently being processed
completed: Dict of completed items
failed: Dict of failed items
metrics: Queue metrics object
Raises:
QueueError: If persistence fails
"""
try:
state = {
"queue": [item.to_dict() for item in queue],
"processing": {k: v.to_dict() for k, v in processing.items()},
"completed": {k: v.to_dict() for k, v in completed.items()},
"failed": {k: v.to_dict() for k, v in failed.items()},
"metrics": {
"total_processed": metrics.total_processed,
"total_failed": metrics.total_failed,
"avg_processing_time": metrics.avg_processing_time,
"success_rate": metrics.success_rate,
"errors_by_type": metrics.errors_by_type,
"last_error": metrics.last_error,
"last_error_time": (
metrics.last_error_time.isoformat()
if metrics.last_error_time
else None
),
"compression_failures": metrics.compression_failures,
"hardware_accel_failures": metrics.hardware_accel_failures,
},
}
# Ensure directory exists
os.makedirs(os.path.dirname(self.persistence_path), exist_ok=True)
# Write to temp file first
temp_path = f"{self.persistence_path}.tmp"
with open(temp_path, "w") as f:
json.dump(state, f, default=str)
f.flush()
os.fsync(f.fileno())
# Atomic rename
os.rename(temp_path, self.persistence_path)
except Exception as e:
logger.error(f"Error persisting queue state: {str(e)}")
raise QueueError(f"Failed to persist queue state: {str(e)}")
def load_queue_state(self) -> Optional[Dict[str, Any]]:
"""Load persisted queue state from disk
Returns:
Dict containing queue state if successful, None if file doesn't exist
Raises:
QueueError: If loading fails
"""
if not self.persistence_path or not os.path.exists(self.persistence_path):
return None
try:
with open(self.persistence_path, "r") as f:
state = json.load(f)
# Helper function to safely convert items
def safe_convert_item(item_data: dict) -> Optional[QueueItem]:
try:
if isinstance(item_data, dict):
# Ensure datetime fields are properly formatted
if 'added_at' in item_data and item_data['added_at']:
if isinstance(item_data['added_at'], str):
try:
item_data['added_at'] = datetime.fromisoformat(item_data['added_at'])
except ValueError:
item_data['added_at'] = datetime.utcnow()
elif not isinstance(item_data['added_at'], datetime):
item_data['added_at'] = datetime.utcnow()
if 'last_retry' in item_data and item_data['last_retry']:
if isinstance(item_data['last_retry'], str):
try:
item_data['last_retry'] = datetime.fromisoformat(item_data['last_retry'])
except ValueError:
item_data['last_retry'] = None
elif not isinstance(item_data['last_retry'], datetime):
item_data['last_retry'] = None
if 'last_error_time' in item_data and item_data['last_error_time']:
if isinstance(item_data['last_error_time'], str):
try:
item_data['last_error_time'] = datetime.fromisoformat(item_data['last_error_time'])
except ValueError:
item_data['last_error_time'] = None
elif not isinstance(item_data['last_error_time'], datetime):
item_data['last_error_time'] = None
# Ensure processing_time is a float
if 'processing_time' in item_data:
try:
if isinstance(item_data['processing_time'], str):
item_data['processing_time'] = float(item_data['processing_time'])
elif not isinstance(item_data['processing_time'], (int, float)):
item_data['processing_time'] = 0.0
except (ValueError, TypeError):
item_data['processing_time'] = 0.0
return QueueItem(**item_data)
return None
except Exception as e:
logger.error(f"Error converting queue item: {e}")
return None
# Convert queue items
queue = []
for item in state.get("queue", []):
converted_item = safe_convert_item(item)
if converted_item:
queue.append(converted_item)
state["queue"] = queue
# Convert processing items
processing = {}
for k, v in state.get("processing", {}).items():
converted_item = safe_convert_item(v)
if converted_item:
processing[k] = converted_item
state["processing"] = processing
# Convert completed items
completed = {}
for k, v in state.get("completed", {}).items():
converted_item = safe_convert_item(v)
if converted_item:
completed[k] = converted_item
state["completed"] = completed
# Convert failed items
failed = {}
for k, v in state.get("failed", {}).items():
converted_item = safe_convert_item(v)
if converted_item:
failed[k] = converted_item
state["failed"] = failed
logger.info("Successfully loaded persisted queue state")
return state
except Exception as e:
logger.error(f"Error loading persisted queue state: {str(e)}")
# Create backup of corrupted state file
if os.path.exists(self.persistence_path):
backup_path = f"{self.persistence_path}.bak.{int(time.time())}"
try:
os.rename(self.persistence_path, backup_path)
logger.info(f"Created backup of corrupted state file: {backup_path}")
except Exception as be:
logger.error(f"Failed to create backup of corrupted state file: {str(be)}")
raise QueueError(f"Failed to load queue state: {str(e)}")
class QueueError(Exception):
"""Base exception for queue-related errors"""
pass