Fixing race conditions and deadlocks in the queue system by:

Using a single lock instead of multiple locks
Properly handling task cancellation
Adding timeouts and retries
Improving error handling and recovery across all components:

Queue manager now properly handles initialization failures
Monitoring system has shorter timeouts and better activity tracking
Cleanup system has proper task tracking and error recovery
Persistence system has file locking and backup mechanisms
Removing deprecated pkg_resources usage and improving the update checker:

Using importlib.metadata for version checking
Adding proper shutdown handling
Improving error handling and retries
This commit is contained in:
pacnpal
2024-11-16 00:36:46 +00:00
parent 51a4e8f48c
commit 3520111cec
5 changed files with 438 additions and 277 deletions

View File

@@ -3,7 +3,7 @@
import asyncio import asyncio
import logging import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict, List, Set from typing import Dict, List, Set, Optional
from .models import QueueItem, QueueMetrics from .models import QueueItem, QueueMetrics
# Configure logging # Configure logging
@@ -17,12 +17,14 @@ class QueueCleaner:
def __init__( def __init__(
self, self,
cleanup_interval: int = 3600, # 1 hour cleanup_interval: int = 1800, # 30 minutes
max_history_age: int = 86400, # 24 hours max_history_age: int = 43200, # 12 hours
): ):
self.cleanup_interval = cleanup_interval self.cleanup_interval = cleanup_interval
self.max_history_age = max_history_age self.max_history_age = max_history_age
self._shutdown = False self._shutdown = False
self._cleanup_task: Optional[asyncio.Task] = None
self._last_cleanup_time = datetime.utcnow()
async def start_cleanup( async def start_cleanup(
self, self,
@@ -47,6 +49,36 @@ class QueueCleaner:
metrics: Reference to queue metrics metrics: Reference to queue metrics
queue_lock: Lock for queue operations queue_lock: Lock for queue operations
""" """
if self._cleanup_task is not None:
logger.warning("Cleanup task already running")
return
logger.info("Starting queue cleanup task...")
self._cleanup_task = asyncio.create_task(
self._cleanup_loop(
queue,
completed,
failed,
guild_queues,
channel_queues,
processing,
metrics,
queue_lock
)
)
async def _cleanup_loop(
self,
queue: List[QueueItem],
completed: Dict[str, QueueItem],
failed: Dict[str, QueueItem],
guild_queues: Dict[int, Set[str]],
channel_queues: Dict[int, Set[str]],
processing: Dict[str, QueueItem],
metrics: QueueMetrics,
queue_lock: asyncio.Lock
) -> None:
"""Main cleanup loop"""
while not self._shutdown: while not self._shutdown:
try: try:
await self._perform_cleanup( await self._perform_cleanup(
@@ -59,17 +91,24 @@ class QueueCleaner:
metrics, metrics,
queue_lock queue_lock
) )
self._last_cleanup_time = datetime.utcnow()
await asyncio.sleep(self.cleanup_interval) await asyncio.sleep(self.cleanup_interval)
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("Queue cleanup cancelled")
break break
except Exception as e: except Exception as e:
logger.error(f"Error in periodic cleanup: {str(e)}") logger.error(f"Error in cleanup loop: {str(e)}")
await asyncio.sleep(60) # Shorter sleep on error to retry sooner
await asyncio.sleep(30)
def stop_cleanup(self) -> None: def stop_cleanup(self) -> None:
"""Stop the cleanup process""" """Stop the cleanup process"""
logger.info("Stopping queue cleanup...")
self._shutdown = True self._shutdown = True
if self._cleanup_task and not self._cleanup_task.done():
self._cleanup_task.cancel()
self._cleanup_task = None
async def _perform_cleanup( async def _perform_cleanup(
self, self,
@@ -97,13 +136,14 @@ class QueueCleaner:
try: try:
current_time = datetime.utcnow() current_time = datetime.utcnow()
cleanup_cutoff = current_time - timedelta(seconds=self.max_history_age) cleanup_cutoff = current_time - timedelta(seconds=self.max_history_age)
items_cleaned = 0
async with queue_lock: async with queue_lock:
# Clean up completed items # Clean up completed items
completed_count = len(completed)
for url in list(completed.keys()): for url in list(completed.keys()):
try: try:
item = completed[url] item = completed[url]
# Ensure added_at is a datetime object
if not isinstance(item.added_at, datetime): if not isinstance(item.added_at, datetime):
try: try:
if isinstance(item.added_at, str): if isinstance(item.added_at, str):
@@ -115,15 +155,17 @@ class QueueCleaner:
if item.added_at < cleanup_cutoff: if item.added_at < cleanup_cutoff:
completed.pop(url) completed.pop(url)
items_cleaned += 1
except Exception as e: except Exception as e:
logger.error(f"Error processing completed item {url}: {e}") logger.error(f"Error cleaning completed item {url}: {e}")
completed.pop(url) completed.pop(url)
items_cleaned += 1
# Clean up failed items # Clean up failed items
failed_count = len(failed)
for url in list(failed.keys()): for url in list(failed.keys()):
try: try:
item = failed[url] item = failed[url]
# Ensure added_at is a datetime object
if not isinstance(item.added_at, datetime): if not isinstance(item.added_at, datetime):
try: try:
if isinstance(item.added_at, str): if isinstance(item.added_at, str):
@@ -135,34 +177,53 @@ class QueueCleaner:
if item.added_at < cleanup_cutoff: if item.added_at < cleanup_cutoff:
failed.pop(url) failed.pop(url)
items_cleaned += 1
except Exception as e: except Exception as e:
logger.error(f"Error processing failed item {url}: {e}") logger.error(f"Error cleaning failed item {url}: {e}")
failed.pop(url) failed.pop(url)
items_cleaned += 1
# Clean up guild tracking # Clean up guild tracking
guild_count = len(guild_queues)
for guild_id in list(guild_queues.keys()): for guild_id in list(guild_queues.keys()):
original_size = len(guild_queues[guild_id])
guild_queues[guild_id] = { guild_queues[guild_id] = {
url for url in guild_queues[guild_id] url for url in guild_queues[guild_id]
if url in queue or url in processing if url in queue or url in processing
} }
items_cleaned += original_size - len(guild_queues[guild_id])
if not guild_queues[guild_id]: if not guild_queues[guild_id]:
guild_queues.pop(guild_id) guild_queues.pop(guild_id)
# Clean up channel tracking # Clean up channel tracking
channel_count = len(channel_queues)
for channel_id in list(channel_queues.keys()): for channel_id in list(channel_queues.keys()):
original_size = len(channel_queues[channel_id])
channel_queues[channel_id] = { channel_queues[channel_id] = {
url for url in channel_queues[channel_id] url for url in channel_queues[channel_id]
if url in queue or url in processing if url in queue or url in processing
} }
items_cleaned += original_size - len(channel_queues[channel_id])
if not channel_queues[channel_id]: if not channel_queues[channel_id]:
channel_queues.pop(channel_id) channel_queues.pop(channel_id)
metrics.last_cleanup = current_time # Update metrics
logger.info("Completed periodic queue cleanup") metrics.last_cleanup = current_time
logger.info(
f"Queue cleanup completed:\n"
f"- Items cleaned: {items_cleaned}\n"
f"- Completed items: {completed_count} -> {len(completed)}\n"
f"- Failed items: {failed_count} -> {len(failed)}\n"
f"- Guild queues: {guild_count} -> {len(guild_queues)}\n"
f"- Channel queues: {channel_count} -> {len(channel_queues)}\n"
f"- Current queue size: {len(queue)}\n"
f"- Processing items: {len(processing)}"
)
except Exception as e: except Exception as e:
logger.error(f"Error during cleanup: {str(e)}") logger.error(f"Error during cleanup: {str(e)}")
raise # Don't re-raise to keep cleanup running
async def clear_guild_queue( async def clear_guild_queue(
self, self,
@@ -195,6 +256,12 @@ class QueueCleaner:
async with queue_lock: async with queue_lock:
# Get URLs for this guild # Get URLs for this guild
guild_urls = guild_queues.get(guild_id, set()) guild_urls = guild_queues.get(guild_id, set())
initial_counts = {
'queue': len([item for item in queue if item.guild_id == guild_id]),
'processing': len([item for item in processing.values() if item.guild_id == guild_id]),
'completed': len([item for item in completed.values() if item.guild_id == guild_id]),
'failed': len([item for item in failed.values() if item.guild_id == guild_id])
}
# Clear from pending queue # Clear from pending queue
queue[:] = [item for item in queue if item.guild_id != guild_id] queue[:] = [item for item in queue if item.guild_id != guild_id]
@@ -231,12 +298,19 @@ class QueueCleaner:
if not channel_queues[channel_id]: if not channel_queues[channel_id]:
channel_queues.pop(channel_id) channel_queues.pop(channel_id)
logger.info(f"Cleared {cleared_count} items from guild {guild_id} queue") logger.info(
return cleared_count f"Cleared guild {guild_id} queue:\n"
f"- Queue: {initial_counts['queue']} items\n"
f"- Processing: {initial_counts['processing']} items\n"
f"- Completed: {initial_counts['completed']} items\n"
f"- Failed: {initial_counts['failed']} items\n"
f"Total cleared: {cleared_count} items"
)
return cleared_count
except Exception as e: except Exception as e:
logger.error(f"Error clearing guild queue: {str(e)}") logger.error(f"Error clearing guild queue: {str(e)}")
raise raise CleanupError(f"Failed to clear guild queue: {str(e)}")
class CleanupError(Exception): class CleanupError(Exception):
"""Base exception for cleanup-related errors""" """Base exception for cleanup-related errors"""

View File

@@ -49,10 +49,8 @@ class EnhancedVideoQueueManager:
self._channel_queues: Dict[int, Set[str]] = {} self._channel_queues: Dict[int, Set[str]] = {}
self._active_tasks: Set[asyncio.Task] = set() self._active_tasks: Set[asyncio.Task] = set()
# Locks # Single lock for all operations to prevent deadlocks
self._global_lock = asyncio.Lock() self._lock = asyncio.Lock()
self._queue_lock = asyncio.Lock()
self._processing_lock = asyncio.Lock()
# State # State
self._shutdown = False self._shutdown = False
@@ -81,45 +79,43 @@ class EnhancedVideoQueueManager:
try: try:
logger.info("Starting queue manager initialization...") logger.info("Starting queue manager initialization...")
# Load persisted state first if available async with self._lock:
if self.persistence: # Load persisted state first if available
await self._load_persisted_state() if self.persistence:
await self._load_persisted_state()
# Start monitoring task # Start monitoring task
monitor_task = asyncio.create_task( monitor_task = asyncio.create_task(
self.monitor.start_monitoring( self.monitor.start_monitoring(
self._queue, self._queue,
self._processing, self._processing,
self.metrics, self.metrics,
self._processing_lock self._lock
)
) )
) self._active_tasks.add(monitor_task)
self._active_tasks.add(monitor_task) logger.info("Queue monitoring started")
logger.info("Queue monitoring started")
# Brief pause to allow monitor to initialize # Start cleanup task
await asyncio.sleep(0.1) cleanup_task = asyncio.create_task(
self.cleaner.start_cleanup(
# Start cleanup task self._queue,
cleanup_task = asyncio.create_task( self._completed,
self.cleaner.start_cleanup( self._failed,
self._queue, self._guild_queues,
self._completed, self._channel_queues,
self._failed, self._processing,
self._guild_queues, self.metrics,
self._channel_queues, self._lock
self._processing, )
self.metrics,
self._queue_lock
) )
) self._active_tasks.add(cleanup_task)
self._active_tasks.add(cleanup_task) logger.info("Queue cleanup started")
logger.info("Queue cleanup started")
# Signal initialization complete # Signal initialization complete
self._initialized = True self._initialized = True
self._init_event.set() self._init_event.set()
logger.info("Queue manager initialization completed") logger.info("Queue manager initialization completed")
except Exception as e: except Exception as e:
logger.error(f"Failed to initialize queue manager: {e}") logger.error(f"Failed to initialize queue manager: {e}")
@@ -131,13 +127,10 @@ class EnhancedVideoQueueManager:
try: try:
state = self.persistence.load_queue_state() state = self.persistence.load_queue_state()
if state: if state:
async with self._queue_lock: self._queue = state["queue"]
self._queue = state["queue"] self._completed = state["completed"]
self._completed = state["completed"] self._failed = state["failed"]
self._failed = state["failed"] self._processing = state["processing"]
async with self._processing_lock:
self._processing = state["processing"]
# Update metrics # Update metrics
metrics_data = state.get("metrics", {}) metrics_data = state.get("metrics", {})
@@ -168,21 +161,14 @@ class EnhancedVideoQueueManager:
while not self._shutdown: while not self._shutdown:
try: try:
items = [] items = []
# Use global lock for coordination async with self._lock:
async with self._global_lock: # Get up to 5 items from queue
# Then acquire specific locks in order while len(items) < 5 and self._queue:
async with self._queue_lock: item = self._queue.pop(0)
# Get up to 5 items from queue items.append(item)
while len(items) < 5 and self._queue: self._processing[item.url] = item
item = self._queue.pop(0) # Update activity timestamp
items.append(item) self.monitor.update_activity()
if items:
async with self._processing_lock:
for item in items:
self._processing[item.url] = item
# Update activity timestamp
self.monitor.update_activity()
if not items: if not items:
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
@@ -194,7 +180,13 @@ class EnhancedVideoQueueManager:
task = asyncio.create_task(self._process_item(processor, item)) task = asyncio.create_task(self._process_item(processor, item))
tasks.append(task) tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True) try:
await asyncio.gather(*tasks, return_exceptions=True)
except asyncio.CancelledError:
logger.info("Queue processing cancelled")
break
except Exception as e:
logger.error(f"Error in queue processing: {e}")
# Persist state if interval has passed # Persist state if interval has passed
current_time = time.time() current_time = time.time()
@@ -202,6 +194,9 @@ class EnhancedVideoQueueManager:
await self._persist_state() await self._persist_state()
last_persist_time = current_time last_persist_time = current_time
except asyncio.CancelledError:
logger.info("Queue processing cancelled")
break
except Exception as e: except Exception as e:
logger.error(f"Critical error in queue processor: {e}") logger.error(f"Critical error in queue processor: {e}")
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
@@ -218,49 +213,46 @@ class EnhancedVideoQueueManager:
logger.info(f"Processing queue item: {item.url}") logger.info(f"Processing queue item: {item.url}")
item.start_processing() item.start_processing()
self.metrics.last_activity_time = time.time() self.metrics.last_activity_time = time.time()
self.monitor.update_activity() # Update activity timestamp self.monitor.update_activity()
success, error = await processor(item) success, error = await processor(item)
async with self._global_lock: async with self._lock:
async with self._processing_lock: item.finish_processing(success, error)
item.finish_processing(success, error) self._processing.pop(item.url, None)
self._processing.pop(item.url, None)
if success: if success:
self._completed[item.url] = item self._completed[item.url] = item
logger.info(f"Successfully processed: {item.url}") logger.info(f"Successfully processed: {item.url}")
else:
if item.retry_count < self.max_retries:
item.retry_count += 1
item.status = "pending"
item.last_retry = datetime.utcnow()
item.priority = max(0, item.priority - 1)
self._queue.append(item)
logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})")
else: else:
async with self._queue_lock: self._failed[item.url] = item
if item.retry_count < self.max_retries: logger.error(f"Failed after {self.max_retries} attempts: {item.url}")
item.retry_count += 1
item.status = "pending"
item.last_retry = datetime.utcnow()
item.priority = max(0, item.priority - 1)
self._queue.append(item)
logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})")
else:
self._failed[item.url] = item
logger.error(f"Failed after {self.max_retries} attempts: {item.url}")
self.metrics.update( self.metrics.update(
processing_time=item.processing_time, processing_time=item.processing_time,
success=success, success=success,
error=error error=error
) )
except Exception as e: except Exception as e:
logger.error(f"Error processing {item.url}: {e}") logger.error(f"Error processing {item.url}: {e}")
async with self._global_lock: async with self._lock:
async with self._processing_lock: item.finish_processing(False, str(e))
item.finish_processing(False, str(e)) self._processing.pop(item.url, None)
self._processing.pop(item.url, None) self._failed[item.url] = item
self._failed[item.url] = item self.metrics.update(
self.metrics.update( processing_time=item.processing_time,
processing_time=item.processing_time, success=False,
success=False, error=str(e)
error=str(e) )
)
async def _persist_state(self) -> None: async def _persist_state(self) -> None:
"""Persist current state to storage""" """Persist current state to storage"""
@@ -268,7 +260,7 @@ class EnhancedVideoQueueManager:
return return
try: try:
async with self._global_lock: async with self._lock:
await self.persistence.persist_queue_state( await self.persistence.persist_queue_state(
self._queue, self._queue,
self._processing, self._processing,
@@ -292,44 +284,43 @@ class EnhancedVideoQueueManager:
if self._shutdown: if self._shutdown:
raise QueueError("Queue manager is shutting down") raise QueueError("Queue manager is shutting down")
# Wait for initialization using the correct event # Wait for initialization
await self._init_event.wait() await self._init_event.wait()
try: try:
async with self._global_lock: async with self._lock:
async with self._queue_lock: if len(self._queue) >= self.max_queue_size:
if len(self._queue) >= self.max_queue_size: raise QueueError("Queue is full")
raise QueueError("Queue is full")
item = QueueItem( item = QueueItem(
url=url, url=url,
message_id=message_id, message_id=message_id,
channel_id=channel_id, channel_id=channel_id,
guild_id=guild_id, guild_id=guild_id,
author_id=author_id, author_id=author_id,
added_at=datetime.utcnow(), added_at=datetime.utcnow(),
priority=priority, priority=priority,
) )
if guild_id not in self._guild_queues: if guild_id not in self._guild_queues:
self._guild_queues[guild_id] = set() self._guild_queues[guild_id] = set()
self._guild_queues[guild_id].add(url) self._guild_queues[guild_id].add(url)
if channel_id not in self._channel_queues: if channel_id not in self._channel_queues:
self._channel_queues[channel_id] = set() self._channel_queues[channel_id] = set()
self._channel_queues[channel_id].add(url) self._channel_queues[channel_id].add(url)
self._queue.append(item) self._queue.append(item)
self._queue.sort(key=lambda x: (-x.priority, x.added_at)) self._queue.sort(key=lambda x: (-x.priority, x.added_at))
self.metrics.last_activity_time = time.time() self.metrics.last_activity_time = time.time()
self.monitor.update_activity() # Update activity timestamp self.monitor.update_activity()
if self.persistence: if self.persistence:
await self._persist_state() await self._persist_state()
logger.info(f"Added to queue: {url} (priority: {priority})") logger.info(f"Added to queue: {url} (priority: {priority})")
return True return True
except Exception as e: except Exception as e:
logger.error(f"Error adding to queue: {e}") logger.error(f"Error adding to queue: {e}")
@@ -400,18 +391,17 @@ class EnhancedVideoQueueManager:
await asyncio.gather(*self._active_tasks, return_exceptions=True) await asyncio.gather(*self._active_tasks, return_exceptions=True)
async with self._global_lock: async with self._lock:
# Move processing items back to queue # Move processing items back to queue
async with self._processing_lock: for url, item in self._processing.items():
for url, item in self._processing.items(): if item.retry_count < self.max_retries:
if item.retry_count < self.max_retries: item.status = "pending"
item.status = "pending" item.retry_count += 1
item.retry_count += 1 self._queue.append(item)
self._queue.append(item) else:
else: self._failed[url] = item
self._failed[url] = item
self._processing.clear() self._processing.clear()
# Final state persistence # Final state persistence
if self.persistence: if self.persistence:

View File

@@ -19,10 +19,10 @@ class QueueMonitor:
def __init__( def __init__(
self, self,
deadlock_threshold: int = 120, # Reduced to 2 minutes deadlock_threshold: int = 60, # Reduced to 1 minute
memory_threshold: int = 512, # 512MB memory_threshold: int = 512, # 512MB
max_retries: int = 3, max_retries: int = 3,
check_interval: int = 30 # Reduced to 30 seconds check_interval: int = 15 # Reduced to 15 seconds
): ):
self.deadlock_threshold = deadlock_threshold self.deadlock_threshold = deadlock_threshold
self.memory_threshold = memory_threshold self.memory_threshold = memory_threshold
@@ -37,7 +37,7 @@ class QueueMonitor:
queue: List[QueueItem], queue: List[QueueItem],
processing: Dict[str, QueueItem], processing: Dict[str, QueueItem],
metrics: QueueMetrics, metrics: QueueMetrics,
processing_lock: asyncio.Lock queue_lock: asyncio.Lock
) -> None: ) -> None:
"""Start monitoring queue health """Start monitoring queue health
@@ -45,7 +45,7 @@ class QueueMonitor:
queue: Reference to the queue list queue: Reference to the queue list
processing: Reference to processing dict processing: Reference to processing dict
metrics: Reference to queue metrics metrics: Reference to queue metrics
processing_lock: Lock for processing dict queue_lock: Lock for queue operations
""" """
if self._monitoring_task is not None: if self._monitoring_task is not None:
logger.warning("Monitoring task already running") logger.warning("Monitoring task already running")
@@ -53,7 +53,7 @@ class QueueMonitor:
logger.info("Starting queue monitoring...") logger.info("Starting queue monitoring...")
self._monitoring_task = asyncio.create_task( self._monitoring_task = asyncio.create_task(
self._monitor_loop(queue, processing, metrics, processing_lock) self._monitor_loop(queue, processing, metrics, queue_lock)
) )
async def _monitor_loop( async def _monitor_loop(
@@ -61,27 +61,27 @@ class QueueMonitor:
queue: List[QueueItem], queue: List[QueueItem],
processing: Dict[str, QueueItem], processing: Dict[str, QueueItem],
metrics: QueueMetrics, metrics: QueueMetrics,
processing_lock: asyncio.Lock queue_lock: asyncio.Lock
) -> None: ) -> None:
"""Main monitoring loop""" """Main monitoring loop"""
while not self._shutdown: while not self._shutdown:
try: try:
await self._check_health(queue, processing, metrics, processing_lock) await self._check_health(queue, processing, metrics, queue_lock)
await asyncio.sleep(self.check_interval) await asyncio.sleep(self.check_interval)
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("Queue monitoring cancelled") logger.info("Queue monitoring cancelled")
break break
except Exception as e: except Exception as e:
logger.error(f"Error in health monitor: {str(e)}") logger.error(f"Error in health monitor: {str(e)}")
await asyncio.sleep(5) # Short sleep on error await asyncio.sleep(1) # Reduced sleep on error
def stop_monitoring(self) -> None: def stop_monitoring(self) -> None:
"""Stop the monitoring process""" """Stop the monitoring process"""
logger.info("Stopping queue monitoring...") logger.info("Stopping queue monitoring...")
self._shutdown = True self._shutdown = True
if self._monitoring_task: if self._monitoring_task and not self._monitoring_task.done():
self._monitoring_task.cancel() self._monitoring_task.cancel()
self._monitoring_task = None self._monitoring_task = None
def update_activity(self) -> None: def update_activity(self) -> None:
"""Update the last active time""" """Update the last active time"""
@@ -92,7 +92,7 @@ class QueueMonitor:
queue: List[QueueItem], queue: List[QueueItem],
processing: Dict[str, QueueItem], processing: Dict[str, QueueItem],
metrics: QueueMetrics, metrics: QueueMetrics,
processing_lock: asyncio.Lock queue_lock: asyncio.Lock
) -> None: ) -> None:
"""Check queue health and performance """Check queue health and performance
@@ -100,7 +100,7 @@ class QueueMonitor:
queue: Reference to the queue list queue: Reference to the queue list
processing: Reference to processing dict processing: Reference to processing dict
metrics: Reference to queue metrics metrics: Reference to queue metrics
processing_lock: Lock for processing dict queue_lock: Lock for queue operations
""" """
try: try:
current_time = time.time() current_time = time.time()
@@ -118,40 +118,37 @@ class QueueMonitor:
logger.info(f"Memory after GC: {memory_after:.2f}MB") logger.info(f"Memory after GC: {memory_after:.2f}MB")
# Check for potential deadlocks # Check for potential deadlocks
processing_times = []
stuck_items = [] stuck_items = []
async with processing_lock: async with queue_lock:
# Check processing items
for url, item in processing.items(): for url, item in processing.items():
if hasattr(item, 'start_time') and item.start_time: if hasattr(item, 'start_time') and item.start_time:
processing_time = current_time - item.start_time processing_time = current_time - item.start_time
processing_times.append(processing_time)
if processing_time > self.deadlock_threshold: if processing_time > self.deadlock_threshold:
stuck_items.append((url, item)) stuck_items.append((url, item))
logger.warning(f"Item stuck in processing: {url} for {processing_time:.1f}s") logger.warning(f"Item stuck in processing: {url} for {processing_time:.1f}s")
if stuck_items: # Handle stuck items if found
logger.warning( if stuck_items:
f"Potential deadlock detected: {len(stuck_items)} items stuck" logger.warning(f"Potential deadlock detected: {len(stuck_items)} items stuck")
) await self._recover_stuck_items(stuck_items, queue, processing)
await self._recover_stuck_items(
stuck_items, queue, processing, processing_lock
)
# Check overall queue activity # Check overall queue activity
if processing and current_time - self._last_active_time > self.deadlock_threshold: if processing and current_time - self._last_active_time > self.deadlock_threshold:
logger.warning("Queue appears to be hung - no activity detected") logger.warning("Queue appears to be hung - no activity detected")
# Force recovery of all processing items # Force recovery of all processing items
async with processing_lock:
all_items = list(processing.items()) all_items = list(processing.items())
await self._recover_stuck_items( await self._recover_stuck_items(all_items, queue, processing)
all_items, queue, processing, processing_lock self._last_active_time = current_time
)
self._last_active_time = current_time
# Update metrics # Update metrics
metrics.last_activity_time = self._last_active_time metrics.last_activity_time = self._last_active_time
metrics.peak_memory_usage = max(metrics.peak_memory_usage, memory_usage) metrics.peak_memory_usage = max(metrics.peak_memory_usage, memory_usage)
# Calculate current metrics
queue_size = len(queue)
processing_count = len(processing)
# Log detailed metrics # Log detailed metrics
logger.info( logger.info(
@@ -161,21 +158,20 @@ class QueueMonitor:
f"- Memory Usage: {memory_usage:.2f}MB\n" f"- Memory Usage: {memory_usage:.2f}MB\n"
f"- Peak Memory: {metrics.peak_memory_usage:.2f}MB\n" f"- Peak Memory: {metrics.peak_memory_usage:.2f}MB\n"
f"- Error Distribution: {metrics.errors_by_type}\n" f"- Error Distribution: {metrics.errors_by_type}\n"
f"- Queue Size: {len(queue)}\n" f"- Queue Size: {queue_size}\n"
f"- Processing Items: {len(processing)}\n" f"- Processing Items: {processing_count}\n"
f"- Last Activity: {(current_time - self._last_active_time):.1f}s ago" f"- Last Activity: {(current_time - self._last_active_time):.1f}s ago"
) )
except Exception as e: except Exception as e:
logger.error(f"Error checking queue health: {str(e)}") logger.error(f"Error checking queue health: {str(e)}")
raise # Don't re-raise to keep monitoring alive
async def _recover_stuck_items( async def _recover_stuck_items(
self, self,
stuck_items: List[tuple[str, QueueItem]], stuck_items: List[tuple[str, QueueItem]],
queue: List[QueueItem], queue: List[QueueItem],
processing: Dict[str, QueueItem], processing: Dict[str, QueueItem]
processing_lock: asyncio.Lock
) -> None: ) -> None:
"""Attempt to recover stuck items """Attempt to recover stuck items
@@ -183,38 +179,36 @@ class QueueMonitor:
stuck_items: List of (url, item) tuples for stuck items stuck_items: List of (url, item) tuples for stuck items
queue: Reference to the queue list queue: Reference to the queue list
processing: Reference to processing dict processing: Reference to processing dict
processing_lock: Lock for processing dict
""" """
try: try:
recovered = 0 recovered = 0
failed = 0 failed = 0
async with processing_lock: for url, item in stuck_items:
for url, item in stuck_items: try:
try: # Move to failed if max retries reached
# Move to failed if max retries reached if item.retry_count >= self.max_retries:
if item.retry_count >= self.max_retries: logger.warning(f"Moving stuck item to failed: {url}")
logger.warning(f"Moving stuck item to failed: {url}") item.status = "failed"
item.status = "failed" item.error = "Exceeded maximum retries after being stuck"
item.error = "Exceeded maximum retries after being stuck" item.last_error = item.error
item.last_error = item.error item.last_error_time = datetime.utcnow()
item.last_error_time = datetime.utcnow() processing.pop(url)
processing.pop(url) failed += 1
failed += 1 else:
else: # Reset for retry
# Reset for retry logger.info(f"Recovering stuck item for retry: {url}")
logger.info(f"Recovering stuck item for retry: {url}") item.retry_count += 1
item.retry_count += 1 item.start_time = None
item.start_time = None item.processing_time = 0
item.processing_time = 0 item.last_retry = datetime.utcnow()
item.last_retry = datetime.utcnow() item.status = "pending"
item.status = "pending" item.priority = max(0, item.priority - 2) # Lower priority
item.priority = max(0, item.priority - 2) # Lower priority queue.append(item)
queue.append(item) processing.pop(url)
processing.pop(url) recovered += 1
recovered += 1 except Exception as e:
except Exception as e: logger.error(f"Error recovering item {url}: {str(e)}")
logger.error(f"Error recovering item {url}: {str(e)}")
# Update activity timestamp after recovery # Update activity timestamp after recovery
self.update_activity() self.update_activity()
@@ -222,7 +216,7 @@ class QueueMonitor:
except Exception as e: except Exception as e:
logger.error(f"Error recovering stuck items: {str(e)}") logger.error(f"Error recovering stuck items: {str(e)}")
raise # Don't re-raise to keep monitoring alive
class MonitoringError(Exception): class MonitoringError(Exception):
"""Base exception for monitoring-related errors""" """Base exception for monitoring-related errors"""

View File

@@ -4,7 +4,9 @@ import json
import logging import logging
import os import os
import time import time
from datetime import datetime import fcntl
import asyncio
from datetime import datetime, timedelta
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
from .models import QueueItem, QueueMetrics from .models import QueueItem, QueueMetrics
@@ -17,13 +19,30 @@ logger = logging.getLogger("QueuePersistence")
class QueuePersistenceManager: class QueuePersistenceManager:
"""Manages persistence of queue state to disk""" """Manages persistence of queue state to disk"""
def __init__(self, persistence_path: str): def __init__(
self,
persistence_path: str,
max_retries: int = 3,
retry_delay: int = 1,
backup_interval: int = 3600, # 1 hour
max_backups: int = 24 # Keep last 24 backups
):
"""Initialize the persistence manager """Initialize the persistence manager
Args: Args:
persistence_path: Path to the persistence file persistence_path: Path to the persistence file
max_retries: Maximum number of retries for file operations
retry_delay: Delay between retries in seconds
backup_interval: Interval between backups in seconds
max_backups: Maximum number of backup files to keep
""" """
self.persistence_path = persistence_path self.persistence_path = persistence_path
self.max_retries = max_retries
self.retry_delay = retry_delay
self.backup_interval = backup_interval
self.max_backups = max_backups
self._last_backup = 0
self._lock_file = f"{persistence_path}.lock"
async def persist_queue_state( async def persist_queue_state(
self, self,
@@ -45,7 +64,9 @@ class QueuePersistenceManager:
Raises: Raises:
QueueError: If persistence fails QueueError: If persistence fails
""" """
lock_fd = None
try: try:
# Create state object
state = { state = {
"queue": [item.to_dict() for item in queue], "queue": [item.to_dict() for item in queue],
"processing": {k: v.to_dict() for k, v in processing.items()}, "processing": {k: v.to_dict() for k, v in processing.items()},
@@ -66,27 +87,81 @@ class QueuePersistenceManager:
"compression_failures": metrics.compression_failures, "compression_failures": metrics.compression_failures,
"hardware_accel_failures": metrics.hardware_accel_failures, "hardware_accel_failures": metrics.hardware_accel_failures,
}, },
"timestamp": datetime.utcnow().isoformat()
} }
# Ensure directory exists # Ensure directory exists
os.makedirs(os.path.dirname(self.persistence_path), exist_ok=True) os.makedirs(os.path.dirname(self.persistence_path), exist_ok=True)
# Write to temp file first # Acquire file lock
temp_path = f"{self.persistence_path}.tmp" lock_fd = open(self._lock_file, 'w')
with open(temp_path, "w") as f: fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX)
json.dump(state, f, default=str)
f.flush()
os.fsync(f.fileno())
# Atomic rename # Write with retries
os.rename(temp_path, self.persistence_path) for attempt in range(self.max_retries):
try:
# Write to temp file first
temp_path = f"{self.persistence_path}.tmp"
with open(temp_path, "w") as f:
json.dump(state, f, default=str, indent=2)
f.flush()
os.fsync(f.fileno())
# Atomic rename
os.rename(temp_path, self.persistence_path)
# Create periodic backup if needed
current_time = time.time()
if current_time - self._last_backup >= self.backup_interval:
await self._create_backup()
self._last_backup = current_time
break
except Exception as e:
if attempt == self.max_retries - 1:
raise
logger.warning(f"Retry {attempt + 1}/{self.max_retries} failed: {e}")
await asyncio.sleep(self.retry_delay)
except Exception as e: except Exception as e:
logger.error(f"Error persisting queue state: {str(e)}") logger.error(f"Error persisting queue state: {str(e)}")
raise QueueError(f"Failed to persist queue state: {str(e)}") raise QueueError(f"Failed to persist queue state: {str(e)}")
finally:
if lock_fd:
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN)
lock_fd.close()
async def _create_backup(self) -> None:
"""Create a backup of the current state file"""
try:
if not os.path.exists(self.persistence_path):
return
# Create backup
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
backup_path = f"{self.persistence_path}.bak.{timestamp}"
with open(self.persistence_path, "rb") as src, open(backup_path, "wb") as dst:
dst.write(src.read())
dst.flush()
os.fsync(dst.fileno())
# Clean old backups
backup_files = sorted([
f for f in os.listdir(os.path.dirname(self.persistence_path))
if f.startswith(os.path.basename(self.persistence_path) + ".bak.")
])
while len(backup_files) > self.max_backups:
old_backup = os.path.join(os.path.dirname(self.persistence_path), backup_files.pop(0))
try:
os.remove(old_backup)
except Exception as e:
logger.warning(f"Failed to remove old backup {old_backup}: {e}")
except Exception as e:
logger.error(f"Failed to create backup: {e}")
def load_queue_state(self) -> Optional[Dict[str, Any]]: def load_queue_state(self) -> Optional[Dict[str, Any]]:
"""Load persisted queue state from disk """Load persisted queue state from disk with retries
Returns: Returns:
Dict containing queue state if successful, None if file doesn't exist Dict containing queue state if successful, None if file doesn't exist
@@ -97,49 +172,66 @@ class QueuePersistenceManager:
if not self.persistence_path or not os.path.exists(self.persistence_path): if not self.persistence_path or not os.path.exists(self.persistence_path):
return None return None
lock_fd = None
try: try:
with open(self.persistence_path, "r") as f: # Acquire file lock
state = json.load(f) lock_fd = open(self._lock_file, 'w')
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX)
# Try loading main file
state = None
last_error = None
for attempt in range(self.max_retries):
try:
with open(self.persistence_path, "r") as f:
state = json.load(f)
break
except Exception as e:
last_error = e
logger.warning(f"Retry {attempt + 1}/{self.max_retries} failed: {e}")
time.sleep(self.retry_delay)
# If main file failed, try loading latest backup
if state is None:
backup_files = sorted([
f for f in os.listdir(os.path.dirname(self.persistence_path))
if f.startswith(os.path.basename(self.persistence_path) + ".bak.")
], reverse=True)
if backup_files:
latest_backup = os.path.join(os.path.dirname(self.persistence_path), backup_files[0])
try:
with open(latest_backup, "r") as f:
state = json.load(f)
logger.info(f"Loaded state from backup: {latest_backup}")
except Exception as e:
logger.error(f"Failed to load backup: {e}")
if last_error:
raise QueueError(f"Failed to load queue state: {last_error}")
raise
if state is None:
return None
# Helper function to safely convert items # Helper function to safely convert items
def safe_convert_item(item_data: dict) -> Optional[QueueItem]: def safe_convert_item(item_data: dict) -> Optional[QueueItem]:
try: try:
if isinstance(item_data, dict): if isinstance(item_data, dict):
# Ensure datetime fields are properly formatted # Ensure datetime fields are properly formatted
if 'added_at' in item_data and item_data['added_at']: for field in ['added_at', 'last_retry', 'last_error_time']:
if isinstance(item_data['added_at'], str): if field in item_data and item_data[field]:
try: if isinstance(item_data[field], str):
item_data['added_at'] = datetime.fromisoformat(item_data['added_at']) try:
except ValueError: item_data[field] = datetime.fromisoformat(item_data[field])
item_data['added_at'] = datetime.utcnow() except ValueError:
elif not isinstance(item_data['added_at'], datetime): item_data[field] = datetime.utcnow() if field == 'added_at' else None
item_data['added_at'] = datetime.utcnow() elif not isinstance(item_data[field], datetime):
item_data[field] = datetime.utcnow() if field == 'added_at' else None
if 'last_retry' in item_data and item_data['last_retry']:
if isinstance(item_data['last_retry'], str):
try:
item_data['last_retry'] = datetime.fromisoformat(item_data['last_retry'])
except ValueError:
item_data['last_retry'] = None
elif not isinstance(item_data['last_retry'], datetime):
item_data['last_retry'] = None
if 'last_error_time' in item_data and item_data['last_error_time']:
if isinstance(item_data['last_error_time'], str):
try:
item_data['last_error_time'] = datetime.fromisoformat(item_data['last_error_time'])
except ValueError:
item_data['last_error_time'] = None
elif not isinstance(item_data['last_error_time'], datetime):
item_data['last_error_time'] = None
# Ensure processing_time is a float # Ensure processing_time is a float
if 'processing_time' in item_data: if 'processing_time' in item_data:
try: try:
if isinstance(item_data['processing_time'], str): item_data['processing_time'] = float(item_data['processing_time'])
item_data['processing_time'] = float(item_data['processing_time'])
elif not isinstance(item_data['processing_time'], (int, float)):
item_data['processing_time'] = 0.0
except (ValueError, TypeError): except (ValueError, TypeError):
item_data['processing_time'] = 0.0 item_data['processing_time'] = 0.0
@@ -188,13 +280,17 @@ class QueuePersistenceManager:
logger.error(f"Error loading persisted queue state: {str(e)}") logger.error(f"Error loading persisted queue state: {str(e)}")
# Create backup of corrupted state file # Create backup of corrupted state file
if os.path.exists(self.persistence_path): if os.path.exists(self.persistence_path):
backup_path = f"{self.persistence_path}.bak.{int(time.time())}" backup_path = f"{self.persistence_path}.corrupted.{int(time.time())}"
try: try:
os.rename(self.persistence_path, backup_path) os.rename(self.persistence_path, backup_path)
logger.info(f"Created backup of corrupted state file: {backup_path}") logger.info(f"Created backup of corrupted state file: {backup_path}")
except Exception as be: except Exception as be:
logger.error(f"Failed to create backup of corrupted state file: {str(be)}") logger.error(f"Failed to create backup of corrupted state file: {str(be)}")
raise QueueError(f"Failed to load queue state: {str(e)}") raise QueueError(f"Failed to load queue state: {str(e)}")
finally:
if lock_fd:
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN)
lock_fd.close()
class QueueError(Exception): class QueueError(Exception):
"""Base exception for queue-related errors""" """Base exception for queue-related errors"""

View File

@@ -37,6 +37,7 @@ class UpdateChecker:
self._rate_limit_reset = 0 self._rate_limit_reset = 0
self._remaining_requests = 60 self._remaining_requests = 60
self._last_version_check: Dict[int, datetime] = {} self._last_version_check: Dict[int, datetime] = {}
self._shutdown = False
async def _init_session(self) -> None: async def _init_session(self) -> None:
"""Initialize aiohttp session with proper headers""" """Initialize aiohttp session with proper headers"""
@@ -45,25 +46,31 @@ class UpdateChecker:
headers={ headers={
'Accept': 'application/vnd.github.v3+json', 'Accept': 'application/vnd.github.v3+json',
'User-Agent': 'VideoArchiver-Bot' 'User-Agent': 'VideoArchiver-Bot'
} },
timeout=aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT)
) )
async def start(self) -> None: async def start(self) -> None:
"""Start the update checker task""" """Start the update checker task"""
if self._check_task is None: if self._check_task is None:
await self._init_session() await self._init_session()
self._check_task = self.bot.loop.create_task(self._check_loop()) self._check_task = asyncio.create_task(self._check_loop())
logger.info("Update checker task started") logger.info("Update checker task started")
async def stop(self) -> None: async def stop(self) -> None:
"""Stop the update checker task and cleanup""" """Stop the update checker task and cleanup"""
if self._check_task: self._shutdown = True
if self._check_task and not self._check_task.done():
self._check_task.cancel() self._check_task.cancel()
self._check_task = None try:
await self._check_task
except asyncio.CancelledError:
pass
self._check_task = None
if self._session and not self._session.closed: if self._session and not self._session.closed:
await self._session.close() await self._session.close()
self._session = None self._session = None
logger.info("Update checker task stopped") logger.info("Update checker task stopped")
@@ -71,7 +78,7 @@ class UpdateChecker:
"""Periodic update check loop with improved error handling""" """Periodic update check loop with improved error handling"""
await self.bot.wait_until_ready() await self.bot.wait_until_ready()
while True: while not self._shutdown:
try: try:
for guild in self.bot.guilds: for guild in self.bot.guilds:
try: try:
@@ -101,6 +108,9 @@ class UpdateChecker:
logger.error(f"Error checking updates for guild {guild.id}: {str(e)}") logger.error(f"Error checking updates for guild {guild.id}: {str(e)}")
continue continue
except asyncio.CancelledError:
logger.info("Update check loop cancelled")
break
except Exception as e: except Exception as e:
logger.error(f"Error in update check task: {str(e)}") logger.error(f"Error in update check task: {str(e)}")
@@ -109,7 +119,7 @@ class UpdateChecker:
async def _check_guild(self, guild: discord.Guild, settings: dict) -> None: async def _check_guild(self, guild: discord.Guild, settings: dict) -> None:
"""Check updates for a specific guild with improved error handling""" """Check updates for a specific guild with improved error handling"""
try: try:
current_version = self._get_current_version() current_version = await self._get_current_version()
if not current_version: if not current_version:
await self._log_error( await self._log_error(
guild, guild,
@@ -136,7 +146,7 @@ class UpdateChecker:
except Exception as e: except Exception as e:
await self._log_error(guild, e, "checking for updates") await self._log_error(guild, e, "checking for updates")
def _get_current_version(self) -> Optional[str]: async def _get_current_version(self) -> Optional[str]:
"""Get current yt-dlp version with error handling""" """Get current yt-dlp version with error handling"""
try: try:
return get_package_version('yt-dlp') return get_package_version('yt-dlp')
@@ -150,10 +160,7 @@ class UpdateChecker:
for attempt in range(self.MAX_RETRIES): for attempt in range(self.MAX_RETRIES):
try: try:
async with self._session.get( async with self._session.get(self.GITHUB_API_URL) as response:
self.GITHUB_API_URL,
timeout=aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT)
) as response:
# Update rate limit info # Update rate limit info
self._remaining_requests = int(response.headers.get('X-RateLimit-Remaining', 0)) self._remaining_requests = int(response.headers.get('X-RateLimit-Remaining', 0))
self._rate_limit_reset = int(response.headers.get('X-RateLimit-Reset', 0)) self._rate_limit_reset = int(response.headers.get('X-RateLimit-Reset', 0))
@@ -272,7 +279,7 @@ class UpdateChecker:
raise UpdateError("Update process timed out") raise UpdateError("Update process timed out")
if process.returncode == 0: if process.returncode == 0:
new_version = self._get_current_version() new_version = await self._get_current_version()
if new_version: if new_version:
return True, f"Successfully updated to version {new_version}" return True, f"Successfully updated to version {new_version}"
return True, "Successfully updated (version unknown)" return True, "Successfully updated (version unknown)"