diff --git a/videoarchiver/queue/cleanup.py b/videoarchiver/queue/cleanup.py index 5aeb982..7e85b6a 100644 --- a/videoarchiver/queue/cleanup.py +++ b/videoarchiver/queue/cleanup.py @@ -3,7 +3,7 @@ import asyncio import logging from datetime import datetime, timedelta -from typing import Dict, List, Set +from typing import Dict, List, Set, Optional from .models import QueueItem, QueueMetrics # Configure logging @@ -17,12 +17,14 @@ class QueueCleaner: def __init__( self, - cleanup_interval: int = 3600, # 1 hour - max_history_age: int = 86400, # 24 hours + cleanup_interval: int = 1800, # 30 minutes + max_history_age: int = 43200, # 12 hours ): self.cleanup_interval = cleanup_interval self.max_history_age = max_history_age self._shutdown = False + self._cleanup_task: Optional[asyncio.Task] = None + self._last_cleanup_time = datetime.utcnow() async def start_cleanup( self, @@ -47,6 +49,36 @@ class QueueCleaner: metrics: Reference to queue metrics queue_lock: Lock for queue operations """ + if self._cleanup_task is not None: + logger.warning("Cleanup task already running") + return + + logger.info("Starting queue cleanup task...") + self._cleanup_task = asyncio.create_task( + self._cleanup_loop( + queue, + completed, + failed, + guild_queues, + channel_queues, + processing, + metrics, + queue_lock + ) + ) + + async def _cleanup_loop( + self, + queue: List[QueueItem], + completed: Dict[str, QueueItem], + failed: Dict[str, QueueItem], + guild_queues: Dict[int, Set[str]], + channel_queues: Dict[int, Set[str]], + processing: Dict[str, QueueItem], + metrics: QueueMetrics, + queue_lock: asyncio.Lock + ) -> None: + """Main cleanup loop""" while not self._shutdown: try: await self._perform_cleanup( @@ -59,17 +91,24 @@ class QueueCleaner: metrics, queue_lock ) + self._last_cleanup_time = datetime.utcnow() await asyncio.sleep(self.cleanup_interval) except asyncio.CancelledError: + logger.info("Queue cleanup cancelled") break except Exception as e: - logger.error(f"Error in periodic cleanup: {str(e)}") - await asyncio.sleep(60) + logger.error(f"Error in cleanup loop: {str(e)}") + # Shorter sleep on error to retry sooner + await asyncio.sleep(30) def stop_cleanup(self) -> None: """Stop the cleanup process""" + logger.info("Stopping queue cleanup...") self._shutdown = True + if self._cleanup_task and not self._cleanup_task.done(): + self._cleanup_task.cancel() + self._cleanup_task = None async def _perform_cleanup( self, @@ -97,13 +136,14 @@ class QueueCleaner: try: current_time = datetime.utcnow() cleanup_cutoff = current_time - timedelta(seconds=self.max_history_age) + items_cleaned = 0 async with queue_lock: # Clean up completed items + completed_count = len(completed) for url in list(completed.keys()): try: item = completed[url] - # Ensure added_at is a datetime object if not isinstance(item.added_at, datetime): try: if isinstance(item.added_at, str): @@ -115,15 +155,17 @@ class QueueCleaner: if item.added_at < cleanup_cutoff: completed.pop(url) + items_cleaned += 1 except Exception as e: - logger.error(f"Error processing completed item {url}: {e}") + logger.error(f"Error cleaning completed item {url}: {e}") completed.pop(url) + items_cleaned += 1 # Clean up failed items + failed_count = len(failed) for url in list(failed.keys()): try: item = failed[url] - # Ensure added_at is a datetime object if not isinstance(item.added_at, datetime): try: if isinstance(item.added_at, str): @@ -135,34 +177,53 @@ class QueueCleaner: if item.added_at < cleanup_cutoff: failed.pop(url) + items_cleaned += 1 except Exception as e: - logger.error(f"Error processing failed item {url}: {e}") + logger.error(f"Error cleaning failed item {url}: {e}") failed.pop(url) + items_cleaned += 1 # Clean up guild tracking + guild_count = len(guild_queues) for guild_id in list(guild_queues.keys()): + original_size = len(guild_queues[guild_id]) guild_queues[guild_id] = { url for url in guild_queues[guild_id] if url in queue or url in processing } + items_cleaned += original_size - len(guild_queues[guild_id]) if not guild_queues[guild_id]: guild_queues.pop(guild_id) # Clean up channel tracking + channel_count = len(channel_queues) for channel_id in list(channel_queues.keys()): + original_size = len(channel_queues[channel_id]) channel_queues[channel_id] = { url for url in channel_queues[channel_id] if url in queue or url in processing } + items_cleaned += original_size - len(channel_queues[channel_id]) if not channel_queues[channel_id]: channel_queues.pop(channel_id) - metrics.last_cleanup = current_time - logger.info("Completed periodic queue cleanup") + # Update metrics + metrics.last_cleanup = current_time + + logger.info( + f"Queue cleanup completed:\n" + f"- Items cleaned: {items_cleaned}\n" + f"- Completed items: {completed_count} -> {len(completed)}\n" + f"- Failed items: {failed_count} -> {len(failed)}\n" + f"- Guild queues: {guild_count} -> {len(guild_queues)}\n" + f"- Channel queues: {channel_count} -> {len(channel_queues)}\n" + f"- Current queue size: {len(queue)}\n" + f"- Processing items: {len(processing)}" + ) except Exception as e: logger.error(f"Error during cleanup: {str(e)}") - raise + # Don't re-raise to keep cleanup running async def clear_guild_queue( self, @@ -195,6 +256,12 @@ class QueueCleaner: async with queue_lock: # Get URLs for this guild guild_urls = guild_queues.get(guild_id, set()) + initial_counts = { + 'queue': len([item for item in queue if item.guild_id == guild_id]), + 'processing': len([item for item in processing.values() if item.guild_id == guild_id]), + 'completed': len([item for item in completed.values() if item.guild_id == guild_id]), + 'failed': len([item for item in failed.values() if item.guild_id == guild_id]) + } # Clear from pending queue queue[:] = [item for item in queue if item.guild_id != guild_id] @@ -231,12 +298,19 @@ class QueueCleaner: if not channel_queues[channel_id]: channel_queues.pop(channel_id) - logger.info(f"Cleared {cleared_count} items from guild {guild_id} queue") - return cleared_count + logger.info( + f"Cleared guild {guild_id} queue:\n" + f"- Queue: {initial_counts['queue']} items\n" + f"- Processing: {initial_counts['processing']} items\n" + f"- Completed: {initial_counts['completed']} items\n" + f"- Failed: {initial_counts['failed']} items\n" + f"Total cleared: {cleared_count} items" + ) + return cleared_count except Exception as e: logger.error(f"Error clearing guild queue: {str(e)}") - raise + raise CleanupError(f"Failed to clear guild queue: {str(e)}") class CleanupError(Exception): """Base exception for cleanup-related errors""" diff --git a/videoarchiver/queue/manager.py b/videoarchiver/queue/manager.py index cd21a6e..d85d803 100644 --- a/videoarchiver/queue/manager.py +++ b/videoarchiver/queue/manager.py @@ -49,10 +49,8 @@ class EnhancedVideoQueueManager: self._channel_queues: Dict[int, Set[str]] = {} self._active_tasks: Set[asyncio.Task] = set() - # Locks - self._global_lock = asyncio.Lock() - self._queue_lock = asyncio.Lock() - self._processing_lock = asyncio.Lock() + # Single lock for all operations to prevent deadlocks + self._lock = asyncio.Lock() # State self._shutdown = False @@ -81,45 +79,43 @@ class EnhancedVideoQueueManager: try: logger.info("Starting queue manager initialization...") - # Load persisted state first if available - if self.persistence: - await self._load_persisted_state() - - # Start monitoring task - monitor_task = asyncio.create_task( - self.monitor.start_monitoring( - self._queue, - self._processing, - self.metrics, - self._processing_lock + async with self._lock: + # Load persisted state first if available + if self.persistence: + await self._load_persisted_state() + + # Start monitoring task + monitor_task = asyncio.create_task( + self.monitor.start_monitoring( + self._queue, + self._processing, + self.metrics, + self._lock + ) ) - ) - self._active_tasks.add(monitor_task) - logger.info("Queue monitoring started") - - # Brief pause to allow monitor to initialize - await asyncio.sleep(0.1) - - # Start cleanup task - cleanup_task = asyncio.create_task( - self.cleaner.start_cleanup( - self._queue, - self._completed, - self._failed, - self._guild_queues, - self._channel_queues, - self._processing, - self.metrics, - self._queue_lock + self._active_tasks.add(monitor_task) + logger.info("Queue monitoring started") + + # Start cleanup task + cleanup_task = asyncio.create_task( + self.cleaner.start_cleanup( + self._queue, + self._completed, + self._failed, + self._guild_queues, + self._channel_queues, + self._processing, + self.metrics, + self._lock + ) ) - ) - self._active_tasks.add(cleanup_task) - logger.info("Queue cleanup started") + self._active_tasks.add(cleanup_task) + logger.info("Queue cleanup started") - # Signal initialization complete - self._initialized = True - self._init_event.set() - logger.info("Queue manager initialization completed") + # Signal initialization complete + self._initialized = True + self._init_event.set() + logger.info("Queue manager initialization completed") except Exception as e: logger.error(f"Failed to initialize queue manager: {e}") @@ -131,13 +127,10 @@ class EnhancedVideoQueueManager: try: state = self.persistence.load_queue_state() if state: - async with self._queue_lock: - self._queue = state["queue"] - self._completed = state["completed"] - self._failed = state["failed"] - - async with self._processing_lock: - self._processing = state["processing"] + self._queue = state["queue"] + self._completed = state["completed"] + self._failed = state["failed"] + self._processing = state["processing"] # Update metrics metrics_data = state.get("metrics", {}) @@ -168,21 +161,14 @@ class EnhancedVideoQueueManager: while not self._shutdown: try: items = [] - # Use global lock for coordination - async with self._global_lock: - # Then acquire specific locks in order - async with self._queue_lock: - # Get up to 5 items from queue - while len(items) < 5 and self._queue: - item = self._queue.pop(0) - items.append(item) - - if items: - async with self._processing_lock: - for item in items: - self._processing[item.url] = item - # Update activity timestamp - self.monitor.update_activity() + async with self._lock: + # Get up to 5 items from queue + while len(items) < 5 and self._queue: + item = self._queue.pop(0) + items.append(item) + self._processing[item.url] = item + # Update activity timestamp + self.monitor.update_activity() if not items: await asyncio.sleep(0.1) @@ -194,7 +180,13 @@ class EnhancedVideoQueueManager: task = asyncio.create_task(self._process_item(processor, item)) tasks.append(task) - await asyncio.gather(*tasks, return_exceptions=True) + try: + await asyncio.gather(*tasks, return_exceptions=True) + except asyncio.CancelledError: + logger.info("Queue processing cancelled") + break + except Exception as e: + logger.error(f"Error in queue processing: {e}") # Persist state if interval has passed current_time = time.time() @@ -202,6 +194,9 @@ class EnhancedVideoQueueManager: await self._persist_state() last_persist_time = current_time + except asyncio.CancelledError: + logger.info("Queue processing cancelled") + break except Exception as e: logger.error(f"Critical error in queue processor: {e}") await asyncio.sleep(0.1) @@ -218,49 +213,46 @@ class EnhancedVideoQueueManager: logger.info(f"Processing queue item: {item.url}") item.start_processing() self.metrics.last_activity_time = time.time() - self.monitor.update_activity() # Update activity timestamp + self.monitor.update_activity() success, error = await processor(item) - async with self._global_lock: - async with self._processing_lock: - item.finish_processing(success, error) - self._processing.pop(item.url, None) - - if success: - self._completed[item.url] = item - logger.info(f"Successfully processed: {item.url}") + async with self._lock: + item.finish_processing(success, error) + self._processing.pop(item.url, None) + + if success: + self._completed[item.url] = item + logger.info(f"Successfully processed: {item.url}") + else: + if item.retry_count < self.max_retries: + item.retry_count += 1 + item.status = "pending" + item.last_retry = datetime.utcnow() + item.priority = max(0, item.priority - 1) + self._queue.append(item) + logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})") else: - async with self._queue_lock: - if item.retry_count < self.max_retries: - item.retry_count += 1 - item.status = "pending" - item.last_retry = datetime.utcnow() - item.priority = max(0, item.priority - 1) - self._queue.append(item) - logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})") - else: - self._failed[item.url] = item - logger.error(f"Failed after {self.max_retries} attempts: {item.url}") - - self.metrics.update( - processing_time=item.processing_time, - success=success, - error=error - ) + self._failed[item.url] = item + logger.error(f"Failed after {self.max_retries} attempts: {item.url}") + + self.metrics.update( + processing_time=item.processing_time, + success=success, + error=error + ) except Exception as e: logger.error(f"Error processing {item.url}: {e}") - async with self._global_lock: - async with self._processing_lock: - item.finish_processing(False, str(e)) - self._processing.pop(item.url, None) - self._failed[item.url] = item - self.metrics.update( - processing_time=item.processing_time, - success=False, - error=str(e) - ) + async with self._lock: + item.finish_processing(False, str(e)) + self._processing.pop(item.url, None) + self._failed[item.url] = item + self.metrics.update( + processing_time=item.processing_time, + success=False, + error=str(e) + ) async def _persist_state(self) -> None: """Persist current state to storage""" @@ -268,7 +260,7 @@ class EnhancedVideoQueueManager: return try: - async with self._global_lock: + async with self._lock: await self.persistence.persist_queue_state( self._queue, self._processing, @@ -292,44 +284,43 @@ class EnhancedVideoQueueManager: if self._shutdown: raise QueueError("Queue manager is shutting down") - # Wait for initialization using the correct event + # Wait for initialization await self._init_event.wait() try: - async with self._global_lock: - async with self._queue_lock: - if len(self._queue) >= self.max_queue_size: - raise QueueError("Queue is full") + async with self._lock: + if len(self._queue) >= self.max_queue_size: + raise QueueError("Queue is full") - item = QueueItem( - url=url, - message_id=message_id, - channel_id=channel_id, - guild_id=guild_id, - author_id=author_id, - added_at=datetime.utcnow(), - priority=priority, - ) + item = QueueItem( + url=url, + message_id=message_id, + channel_id=channel_id, + guild_id=guild_id, + author_id=author_id, + added_at=datetime.utcnow(), + priority=priority, + ) - if guild_id not in self._guild_queues: - self._guild_queues[guild_id] = set() - self._guild_queues[guild_id].add(url) + if guild_id not in self._guild_queues: + self._guild_queues[guild_id] = set() + self._guild_queues[guild_id].add(url) - if channel_id not in self._channel_queues: - self._channel_queues[channel_id] = set() - self._channel_queues[channel_id].add(url) + if channel_id not in self._channel_queues: + self._channel_queues[channel_id] = set() + self._channel_queues[channel_id].add(url) - self._queue.append(item) - self._queue.sort(key=lambda x: (-x.priority, x.added_at)) + self._queue.append(item) + self._queue.sort(key=lambda x: (-x.priority, x.added_at)) - self.metrics.last_activity_time = time.time() - self.monitor.update_activity() # Update activity timestamp + self.metrics.last_activity_time = time.time() + self.monitor.update_activity() - if self.persistence: - await self._persist_state() + if self.persistence: + await self._persist_state() - logger.info(f"Added to queue: {url} (priority: {priority})") - return True + logger.info(f"Added to queue: {url} (priority: {priority})") + return True except Exception as e: logger.error(f"Error adding to queue: {e}") @@ -400,18 +391,17 @@ class EnhancedVideoQueueManager: await asyncio.gather(*self._active_tasks, return_exceptions=True) - async with self._global_lock: + async with self._lock: # Move processing items back to queue - async with self._processing_lock: - for url, item in self._processing.items(): - if item.retry_count < self.max_retries: - item.status = "pending" - item.retry_count += 1 - self._queue.append(item) - else: - self._failed[url] = item + for url, item in self._processing.items(): + if item.retry_count < self.max_retries: + item.status = "pending" + item.retry_count += 1 + self._queue.append(item) + else: + self._failed[url] = item - self._processing.clear() + self._processing.clear() # Final state persistence if self.persistence: diff --git a/videoarchiver/queue/monitoring.py b/videoarchiver/queue/monitoring.py index 2e57474..c42d089 100644 --- a/videoarchiver/queue/monitoring.py +++ b/videoarchiver/queue/monitoring.py @@ -19,10 +19,10 @@ class QueueMonitor: def __init__( self, - deadlock_threshold: int = 120, # Reduced to 2 minutes + deadlock_threshold: int = 60, # Reduced to 1 minute memory_threshold: int = 512, # 512MB max_retries: int = 3, - check_interval: int = 30 # Reduced to 30 seconds + check_interval: int = 15 # Reduced to 15 seconds ): self.deadlock_threshold = deadlock_threshold self.memory_threshold = memory_threshold @@ -37,7 +37,7 @@ class QueueMonitor: queue: List[QueueItem], processing: Dict[str, QueueItem], metrics: QueueMetrics, - processing_lock: asyncio.Lock + queue_lock: asyncio.Lock ) -> None: """Start monitoring queue health @@ -45,7 +45,7 @@ class QueueMonitor: queue: Reference to the queue list processing: Reference to processing dict metrics: Reference to queue metrics - processing_lock: Lock for processing dict + queue_lock: Lock for queue operations """ if self._monitoring_task is not None: logger.warning("Monitoring task already running") @@ -53,7 +53,7 @@ class QueueMonitor: logger.info("Starting queue monitoring...") self._monitoring_task = asyncio.create_task( - self._monitor_loop(queue, processing, metrics, processing_lock) + self._monitor_loop(queue, processing, metrics, queue_lock) ) async def _monitor_loop( @@ -61,27 +61,27 @@ class QueueMonitor: queue: List[QueueItem], processing: Dict[str, QueueItem], metrics: QueueMetrics, - processing_lock: asyncio.Lock + queue_lock: asyncio.Lock ) -> None: """Main monitoring loop""" while not self._shutdown: try: - await self._check_health(queue, processing, metrics, processing_lock) + await self._check_health(queue, processing, metrics, queue_lock) await asyncio.sleep(self.check_interval) except asyncio.CancelledError: logger.info("Queue monitoring cancelled") break except Exception as e: logger.error(f"Error in health monitor: {str(e)}") - await asyncio.sleep(5) # Short sleep on error + await asyncio.sleep(1) # Reduced sleep on error def stop_monitoring(self) -> None: """Stop the monitoring process""" logger.info("Stopping queue monitoring...") self._shutdown = True - if self._monitoring_task: + if self._monitoring_task and not self._monitoring_task.done(): self._monitoring_task.cancel() - self._monitoring_task = None + self._monitoring_task = None def update_activity(self) -> None: """Update the last active time""" @@ -92,7 +92,7 @@ class QueueMonitor: queue: List[QueueItem], processing: Dict[str, QueueItem], metrics: QueueMetrics, - processing_lock: asyncio.Lock + queue_lock: asyncio.Lock ) -> None: """Check queue health and performance @@ -100,7 +100,7 @@ class QueueMonitor: queue: Reference to the queue list processing: Reference to processing dict metrics: Reference to queue metrics - processing_lock: Lock for processing dict + queue_lock: Lock for queue operations """ try: current_time = time.time() @@ -118,40 +118,37 @@ class QueueMonitor: logger.info(f"Memory after GC: {memory_after:.2f}MB") # Check for potential deadlocks - processing_times = [] stuck_items = [] - async with processing_lock: + async with queue_lock: + # Check processing items for url, item in processing.items(): if hasattr(item, 'start_time') and item.start_time: processing_time = current_time - item.start_time - processing_times.append(processing_time) if processing_time > self.deadlock_threshold: stuck_items.append((url, item)) logger.warning(f"Item stuck in processing: {url} for {processing_time:.1f}s") - if stuck_items: - logger.warning( - f"Potential deadlock detected: {len(stuck_items)} items stuck" - ) - await self._recover_stuck_items( - stuck_items, queue, processing, processing_lock - ) + # Handle stuck items if found + if stuck_items: + logger.warning(f"Potential deadlock detected: {len(stuck_items)} items stuck") + await self._recover_stuck_items(stuck_items, queue, processing) - # Check overall queue activity - if processing and current_time - self._last_active_time > self.deadlock_threshold: - logger.warning("Queue appears to be hung - no activity detected") - # Force recovery of all processing items - async with processing_lock: + # Check overall queue activity + if processing and current_time - self._last_active_time > self.deadlock_threshold: + logger.warning("Queue appears to be hung - no activity detected") + # Force recovery of all processing items all_items = list(processing.items()) - await self._recover_stuck_items( - all_items, queue, processing, processing_lock - ) - self._last_active_time = current_time + await self._recover_stuck_items(all_items, queue, processing) + self._last_active_time = current_time - # Update metrics - metrics.last_activity_time = self._last_active_time - metrics.peak_memory_usage = max(metrics.peak_memory_usage, memory_usage) + # Update metrics + metrics.last_activity_time = self._last_active_time + metrics.peak_memory_usage = max(metrics.peak_memory_usage, memory_usage) + + # Calculate current metrics + queue_size = len(queue) + processing_count = len(processing) # Log detailed metrics logger.info( @@ -161,21 +158,20 @@ class QueueMonitor: f"- Memory Usage: {memory_usage:.2f}MB\n" f"- Peak Memory: {metrics.peak_memory_usage:.2f}MB\n" f"- Error Distribution: {metrics.errors_by_type}\n" - f"- Queue Size: {len(queue)}\n" - f"- Processing Items: {len(processing)}\n" + f"- Queue Size: {queue_size}\n" + f"- Processing Items: {processing_count}\n" f"- Last Activity: {(current_time - self._last_active_time):.1f}s ago" ) except Exception as e: logger.error(f"Error checking queue health: {str(e)}") - raise + # Don't re-raise to keep monitoring alive async def _recover_stuck_items( self, stuck_items: List[tuple[str, QueueItem]], queue: List[QueueItem], - processing: Dict[str, QueueItem], - processing_lock: asyncio.Lock + processing: Dict[str, QueueItem] ) -> None: """Attempt to recover stuck items @@ -183,38 +179,36 @@ class QueueMonitor: stuck_items: List of (url, item) tuples for stuck items queue: Reference to the queue list processing: Reference to processing dict - processing_lock: Lock for processing dict """ try: recovered = 0 failed = 0 - async with processing_lock: - for url, item in stuck_items: - try: - # Move to failed if max retries reached - if item.retry_count >= self.max_retries: - logger.warning(f"Moving stuck item to failed: {url}") - item.status = "failed" - item.error = "Exceeded maximum retries after being stuck" - item.last_error = item.error - item.last_error_time = datetime.utcnow() - processing.pop(url) - failed += 1 - else: - # Reset for retry - logger.info(f"Recovering stuck item for retry: {url}") - item.retry_count += 1 - item.start_time = None - item.processing_time = 0 - item.last_retry = datetime.utcnow() - item.status = "pending" - item.priority = max(0, item.priority - 2) # Lower priority - queue.append(item) - processing.pop(url) - recovered += 1 - except Exception as e: - logger.error(f"Error recovering item {url}: {str(e)}") + for url, item in stuck_items: + try: + # Move to failed if max retries reached + if item.retry_count >= self.max_retries: + logger.warning(f"Moving stuck item to failed: {url}") + item.status = "failed" + item.error = "Exceeded maximum retries after being stuck" + item.last_error = item.error + item.last_error_time = datetime.utcnow() + processing.pop(url) + failed += 1 + else: + # Reset for retry + logger.info(f"Recovering stuck item for retry: {url}") + item.retry_count += 1 + item.start_time = None + item.processing_time = 0 + item.last_retry = datetime.utcnow() + item.status = "pending" + item.priority = max(0, item.priority - 2) # Lower priority + queue.append(item) + processing.pop(url) + recovered += 1 + except Exception as e: + logger.error(f"Error recovering item {url}: {str(e)}") # Update activity timestamp after recovery self.update_activity() @@ -222,7 +216,7 @@ class QueueMonitor: except Exception as e: logger.error(f"Error recovering stuck items: {str(e)}") - raise + # Don't re-raise to keep monitoring alive class MonitoringError(Exception): """Base exception for monitoring-related errors""" diff --git a/videoarchiver/queue/persistence.py b/videoarchiver/queue/persistence.py index 6333fed..971e3b1 100644 --- a/videoarchiver/queue/persistence.py +++ b/videoarchiver/queue/persistence.py @@ -4,7 +4,9 @@ import json import logging import os import time -from datetime import datetime +import fcntl +import asyncio +from datetime import datetime, timedelta from typing import Dict, Any, Optional from .models import QueueItem, QueueMetrics @@ -17,13 +19,30 @@ logger = logging.getLogger("QueuePersistence") class QueuePersistenceManager: """Manages persistence of queue state to disk""" - def __init__(self, persistence_path: str): + def __init__( + self, + persistence_path: str, + max_retries: int = 3, + retry_delay: int = 1, + backup_interval: int = 3600, # 1 hour + max_backups: int = 24 # Keep last 24 backups + ): """Initialize the persistence manager Args: persistence_path: Path to the persistence file + max_retries: Maximum number of retries for file operations + retry_delay: Delay between retries in seconds + backup_interval: Interval between backups in seconds + max_backups: Maximum number of backup files to keep """ self.persistence_path = persistence_path + self.max_retries = max_retries + self.retry_delay = retry_delay + self.backup_interval = backup_interval + self.max_backups = max_backups + self._last_backup = 0 + self._lock_file = f"{persistence_path}.lock" async def persist_queue_state( self, @@ -45,7 +64,9 @@ class QueuePersistenceManager: Raises: QueueError: If persistence fails """ + lock_fd = None try: + # Create state object state = { "queue": [item.to_dict() for item in queue], "processing": {k: v.to_dict() for k, v in processing.items()}, @@ -66,27 +87,81 @@ class QueuePersistenceManager: "compression_failures": metrics.compression_failures, "hardware_accel_failures": metrics.hardware_accel_failures, }, + "timestamp": datetime.utcnow().isoformat() } # Ensure directory exists os.makedirs(os.path.dirname(self.persistence_path), exist_ok=True) - # Write to temp file first - temp_path = f"{self.persistence_path}.tmp" - with open(temp_path, "w") as f: - json.dump(state, f, default=str) - f.flush() - os.fsync(f.fileno()) + # Acquire file lock + lock_fd = open(self._lock_file, 'w') + fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX) - # Atomic rename - os.rename(temp_path, self.persistence_path) + # Write with retries + for attempt in range(self.max_retries): + try: + # Write to temp file first + temp_path = f"{self.persistence_path}.tmp" + with open(temp_path, "w") as f: + json.dump(state, f, default=str, indent=2) + f.flush() + os.fsync(f.fileno()) + + # Atomic rename + os.rename(temp_path, self.persistence_path) + + # Create periodic backup if needed + current_time = time.time() + if current_time - self._last_backup >= self.backup_interval: + await self._create_backup() + self._last_backup = current_time + + break + except Exception as e: + if attempt == self.max_retries - 1: + raise + logger.warning(f"Retry {attempt + 1}/{self.max_retries} failed: {e}") + await asyncio.sleep(self.retry_delay) except Exception as e: logger.error(f"Error persisting queue state: {str(e)}") raise QueueError(f"Failed to persist queue state: {str(e)}") + finally: + if lock_fd: + fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN) + lock_fd.close() + + async def _create_backup(self) -> None: + """Create a backup of the current state file""" + try: + if not os.path.exists(self.persistence_path): + return + + # Create backup + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + backup_path = f"{self.persistence_path}.bak.{timestamp}" + with open(self.persistence_path, "rb") as src, open(backup_path, "wb") as dst: + dst.write(src.read()) + dst.flush() + os.fsync(dst.fileno()) + + # Clean old backups + backup_files = sorted([ + f for f in os.listdir(os.path.dirname(self.persistence_path)) + if f.startswith(os.path.basename(self.persistence_path) + ".bak.") + ]) + while len(backup_files) > self.max_backups: + old_backup = os.path.join(os.path.dirname(self.persistence_path), backup_files.pop(0)) + try: + os.remove(old_backup) + except Exception as e: + logger.warning(f"Failed to remove old backup {old_backup}: {e}") + + except Exception as e: + logger.error(f"Failed to create backup: {e}") def load_queue_state(self) -> Optional[Dict[str, Any]]: - """Load persisted queue state from disk + """Load persisted queue state from disk with retries Returns: Dict containing queue state if successful, None if file doesn't exist @@ -97,49 +172,66 @@ class QueuePersistenceManager: if not self.persistence_path or not os.path.exists(self.persistence_path): return None + lock_fd = None try: - with open(self.persistence_path, "r") as f: - state = json.load(f) + # Acquire file lock + lock_fd = open(self._lock_file, 'w') + fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX) + + # Try loading main file + state = None + last_error = None + for attempt in range(self.max_retries): + try: + with open(self.persistence_path, "r") as f: + state = json.load(f) + break + except Exception as e: + last_error = e + logger.warning(f"Retry {attempt + 1}/{self.max_retries} failed: {e}") + time.sleep(self.retry_delay) + + # If main file failed, try loading latest backup + if state is None: + backup_files = sorted([ + f for f in os.listdir(os.path.dirname(self.persistence_path)) + if f.startswith(os.path.basename(self.persistence_path) + ".bak.") + ], reverse=True) + + if backup_files: + latest_backup = os.path.join(os.path.dirname(self.persistence_path), backup_files[0]) + try: + with open(latest_backup, "r") as f: + state = json.load(f) + logger.info(f"Loaded state from backup: {latest_backup}") + except Exception as e: + logger.error(f"Failed to load backup: {e}") + if last_error: + raise QueueError(f"Failed to load queue state: {last_error}") + raise + + if state is None: + return None # Helper function to safely convert items def safe_convert_item(item_data: dict) -> Optional[QueueItem]: try: if isinstance(item_data, dict): # Ensure datetime fields are properly formatted - if 'added_at' in item_data and item_data['added_at']: - if isinstance(item_data['added_at'], str): - try: - item_data['added_at'] = datetime.fromisoformat(item_data['added_at']) - except ValueError: - item_data['added_at'] = datetime.utcnow() - elif not isinstance(item_data['added_at'], datetime): - item_data['added_at'] = datetime.utcnow() - - if 'last_retry' in item_data and item_data['last_retry']: - if isinstance(item_data['last_retry'], str): - try: - item_data['last_retry'] = datetime.fromisoformat(item_data['last_retry']) - except ValueError: - item_data['last_retry'] = None - elif not isinstance(item_data['last_retry'], datetime): - item_data['last_retry'] = None - - if 'last_error_time' in item_data and item_data['last_error_time']: - if isinstance(item_data['last_error_time'], str): - try: - item_data['last_error_time'] = datetime.fromisoformat(item_data['last_error_time']) - except ValueError: - item_data['last_error_time'] = None - elif not isinstance(item_data['last_error_time'], datetime): - item_data['last_error_time'] = None + for field in ['added_at', 'last_retry', 'last_error_time']: + if field in item_data and item_data[field]: + if isinstance(item_data[field], str): + try: + item_data[field] = datetime.fromisoformat(item_data[field]) + except ValueError: + item_data[field] = datetime.utcnow() if field == 'added_at' else None + elif not isinstance(item_data[field], datetime): + item_data[field] = datetime.utcnow() if field == 'added_at' else None # Ensure processing_time is a float if 'processing_time' in item_data: try: - if isinstance(item_data['processing_time'], str): - item_data['processing_time'] = float(item_data['processing_time']) - elif not isinstance(item_data['processing_time'], (int, float)): - item_data['processing_time'] = 0.0 + item_data['processing_time'] = float(item_data['processing_time']) except (ValueError, TypeError): item_data['processing_time'] = 0.0 @@ -188,13 +280,17 @@ class QueuePersistenceManager: logger.error(f"Error loading persisted queue state: {str(e)}") # Create backup of corrupted state file if os.path.exists(self.persistence_path): - backup_path = f"{self.persistence_path}.bak.{int(time.time())}" + backup_path = f"{self.persistence_path}.corrupted.{int(time.time())}" try: os.rename(self.persistence_path, backup_path) logger.info(f"Created backup of corrupted state file: {backup_path}") except Exception as be: logger.error(f"Failed to create backup of corrupted state file: {str(be)}") raise QueueError(f"Failed to load queue state: {str(e)}") + finally: + if lock_fd: + fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN) + lock_fd.close() class QueueError(Exception): """Base exception for queue-related errors""" diff --git a/videoarchiver/update_checker.py b/videoarchiver/update_checker.py index 4484336..05a2bd8 100644 --- a/videoarchiver/update_checker.py +++ b/videoarchiver/update_checker.py @@ -37,6 +37,7 @@ class UpdateChecker: self._rate_limit_reset = 0 self._remaining_requests = 60 self._last_version_check: Dict[int, datetime] = {} + self._shutdown = False async def _init_session(self) -> None: """Initialize aiohttp session with proper headers""" @@ -45,25 +46,31 @@ class UpdateChecker: headers={ 'Accept': 'application/vnd.github.v3+json', 'User-Agent': 'VideoArchiver-Bot' - } + }, + timeout=aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT) ) async def start(self) -> None: """Start the update checker task""" if self._check_task is None: await self._init_session() - self._check_task = self.bot.loop.create_task(self._check_loop()) + self._check_task = asyncio.create_task(self._check_loop()) logger.info("Update checker task started") async def stop(self) -> None: """Stop the update checker task and cleanup""" - if self._check_task: + self._shutdown = True + if self._check_task and not self._check_task.done(): self._check_task.cancel() - self._check_task = None + try: + await self._check_task + except asyncio.CancelledError: + pass + self._check_task = None if self._session and not self._session.closed: await self._session.close() - self._session = None + self._session = None logger.info("Update checker task stopped") @@ -71,7 +78,7 @@ class UpdateChecker: """Periodic update check loop with improved error handling""" await self.bot.wait_until_ready() - while True: + while not self._shutdown: try: for guild in self.bot.guilds: try: @@ -101,6 +108,9 @@ class UpdateChecker: logger.error(f"Error checking updates for guild {guild.id}: {str(e)}") continue + except asyncio.CancelledError: + logger.info("Update check loop cancelled") + break except Exception as e: logger.error(f"Error in update check task: {str(e)}") @@ -109,7 +119,7 @@ class UpdateChecker: async def _check_guild(self, guild: discord.Guild, settings: dict) -> None: """Check updates for a specific guild with improved error handling""" try: - current_version = self._get_current_version() + current_version = await self._get_current_version() if not current_version: await self._log_error( guild, @@ -136,7 +146,7 @@ class UpdateChecker: except Exception as e: await self._log_error(guild, e, "checking for updates") - def _get_current_version(self) -> Optional[str]: + async def _get_current_version(self) -> Optional[str]: """Get current yt-dlp version with error handling""" try: return get_package_version('yt-dlp') @@ -150,10 +160,7 @@ class UpdateChecker: for attempt in range(self.MAX_RETRIES): try: - async with self._session.get( - self.GITHUB_API_URL, - timeout=aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT) - ) as response: + async with self._session.get(self.GITHUB_API_URL) as response: # Update rate limit info self._remaining_requests = int(response.headers.get('X-RateLimit-Remaining', 0)) self._rate_limit_reset = int(response.headers.get('X-RateLimit-Reset', 0)) @@ -272,7 +279,7 @@ class UpdateChecker: raise UpdateError("Update process timed out") if process.returncode == 0: - new_version = self._get_current_version() + new_version = await self._get_current_version() if new_version: return True, f"Successfully updated to version {new_version}" return True, "Successfully updated (version unknown)"