diff --git a/videoarchiver/core/base.py b/videoarchiver/core/base.py index 40c1354..7b43a4c 100644 --- a/videoarchiver/core/base.py +++ b/videoarchiver/core/base.py @@ -40,11 +40,11 @@ from .events import setup_events logger = logging.getLogger("VideoArchiver") -# Constants for timeouts - extremely aggressive timeouts -UNLOAD_TIMEOUT = 2 # seconds -CLEANUP_TIMEOUT = 1 # seconds -INIT_TIMEOUT = 5 # seconds -COMPONENT_INIT_TIMEOUT = 2 # seconds +# Constants for timeouts - more reasonable timeouts +UNLOAD_TIMEOUT = 30 # seconds +CLEANUP_TIMEOUT = 15 # seconds +INIT_TIMEOUT = 60 # seconds +COMPONENT_INIT_TIMEOUT = 30 # seconds class VideoArchiver(GroupCog): """Archive videos from Discord channels""" @@ -73,6 +73,9 @@ class VideoArchiver(GroupCog): self._cleanup_task: Optional[asyncio.Task] = None self._unloading = False self.db = None + self.queue_manager = None + self.processor = None + self.components = {} # Start initialization self._init_task = asyncio.create_task(self._initialize()) @@ -335,10 +338,12 @@ class VideoArchiver(GroupCog): """Handle initialization task completion""" try: task.result() + logger.info("Initialization completed successfully") except asyncio.CancelledError: - pass + logger.warning("Initialization was cancelled") + asyncio.create_task(self._cleanup()) except Exception as e: - logger.error(f"Initialization failed: {str(e)}") + logger.error(f"Initialization failed: {str(e)}\n{traceback.format_exc()}") asyncio.create_task(self._cleanup()) async def _initialize(self) -> None: @@ -348,11 +353,13 @@ class VideoArchiver(GroupCog): config = Config.get_conf(self, identifier=855847, force_registration=True) config.register_guild(**self.default_guild_settings) self.config_manager = ConfigManager(config) + logger.info("Config manager initialized") # Set up paths self.data_path = Path(data_manager.cog_data_path(self)) self.download_path = self.data_path / "downloads" self.download_path.mkdir(parents=True, exist_ok=True) + logger.info("Paths initialized") # Clean existing downloads with timeout try: @@ -360,31 +367,15 @@ class VideoArchiver(GroupCog): cleanup_downloads(str(self.download_path)), timeout=CLEANUP_TIMEOUT ) + logger.info("Downloads cleaned up") except asyncio.TimeoutError: logger.warning("Download cleanup timed out, continuing initialization") # Initialize shared FFmpeg manager self.ffmpeg_mgr = FFmpegManager() - logger.info("Initialized shared FFmpeg manager") + logger.info("FFmpeg manager initialized") - # Initialize components dict first - self.components: Dict[int, Dict[str, Any]] = {} - - # Initialize components for existing guilds with timeout - for guild in self.bot.guilds: - try: - await asyncio.wait_for( - initialize_guild_components(self, guild.id), - timeout=COMPONENT_INIT_TIMEOUT - ) - except asyncio.TimeoutError: - logger.error(f"Guild {guild.id} initialization timed out") - continue - except Exception as e: - logger.error(f"Failed to initialize guild {guild.id}: {str(e)}") - continue - - # Initialize queue manager after components are ready + # Initialize queue manager before components queue_path = self.data_path / "queue_state.json" queue_path.parent.mkdir(parents=True, exist_ok=True) self.queue_manager = EnhancedVideoQueueManager( @@ -395,9 +386,7 @@ class VideoArchiver(GroupCog): max_history_age=86400, persistence_path=str(queue_path), ) - - # Initialize update checker - self.update_checker = UpdateChecker(self.bot, self.config_manager) + logger.info("Queue manager initialized") # Initialize processor with queue manager and shared FFmpeg manager self.processor = VideoProcessor( @@ -408,6 +397,26 @@ class VideoArchiver(GroupCog): ffmpeg_mgr=self.ffmpeg_mgr, db=self.db, # Pass database to processor (None by default) ) + logger.info("Video processor initialized") + + # Initialize components for existing guilds with timeout + for guild in self.bot.guilds: + try: + await asyncio.wait_for( + initialize_guild_components(self, guild.id), + timeout=COMPONENT_INIT_TIMEOUT + ) + logger.info(f"Guild {guild.id} components initialized") + except asyncio.TimeoutError: + logger.error(f"Guild {guild.id} initialization timed out") + continue + except Exception as e: + logger.error(f"Failed to initialize guild {guild.id}: {str(e)}") + continue + + # Initialize update checker + self.update_checker = UpdateChecker(self.bot, self.config_manager) + logger.info("Update checker initialized") # Start update checker with timeout try: @@ -415,15 +424,20 @@ class VideoArchiver(GroupCog): self.update_checker.start(), timeout=INIT_TIMEOUT ) + logger.info("Update checker started") except asyncio.TimeoutError: logger.warning("Update checker start timed out") + # Start queue processing + await self.queue_manager.process_queue(self.processor.process_video) + logger.info("Queue processing started") + # Set ready flag self.ready.set() logger.info("VideoArchiver initialization completed successfully") except Exception as e: - logger.error(f"Critical error during initialization: {str(e)}") + logger.error(f"Critical error during initialization: {str(e)}\n{traceback.format_exc()}") # Force cleanup on initialization error try: await asyncio.wait_for( @@ -435,22 +449,24 @@ class VideoArchiver(GroupCog): raise async def cog_load(self) -> None: - """Handle cog loading with aggressive timeout""" + """Handle cog loading with proper timeout""" try: # Create initialization task init_task = asyncio.create_task(self._initialize()) try: # Wait for initialization with timeout await asyncio.wait_for(init_task, timeout=INIT_TIMEOUT) + logger.info("Initialization completed within timeout") except asyncio.TimeoutError: logger.error("Initialization timed out, forcing cleanup") init_task.cancel() await force_cleanup_resources(self) raise ProcessingError("Cog initialization timed out") - # Wait for ready flag with short timeout + # Wait for ready flag with timeout try: await asyncio.wait_for(self.ready.wait(), timeout=INIT_TIMEOUT) + logger.info("Ready flag set within timeout") except asyncio.TimeoutError: await force_cleanup_resources(self) raise ProcessingError("Ready flag wait timed out") @@ -467,34 +483,36 @@ class VideoArchiver(GroupCog): raise ProcessingError(f"Error during cog load: {str(e)}") async def cog_unload(self) -> None: - """Clean up when cog is unloaded with extremely aggressive timeout handling""" + """Clean up when cog is unloaded with proper timeout handling""" self._unloading = True try: - # Cancel any pending tasks immediately + # Cancel any pending tasks if self._init_task and not self._init_task.done(): self._init_task.cancel() if self._cleanup_task and not self._cleanup_task.done(): self._cleanup_task.cancel() - # Try normal cleanup first with very short timeout + # Try normal cleanup first cleanup_task = asyncio.create_task(cleanup_resources(self)) try: - await asyncio.wait_for(cleanup_task, timeout=CLEANUP_TIMEOUT) + await asyncio.wait_for(cleanup_task, timeout=UNLOAD_TIMEOUT) + logger.info("Normal cleanup completed") except (asyncio.TimeoutError, Exception) as e: if isinstance(e, asyncio.TimeoutError): logger.warning("Normal cleanup timed out, forcing cleanup") else: logger.error(f"Error during normal cleanup: {str(e)}") - # Cancel normal cleanup and force cleanup immediately + # Cancel normal cleanup and force cleanup cleanup_task.cancel() try: - # Force cleanup with very short timeout + # Force cleanup with timeout await asyncio.wait_for( force_cleanup_resources(self), timeout=CLEANUP_TIMEOUT ) + logger.info("Force cleanup completed") except asyncio.TimeoutError: logger.error("Force cleanup timed out") except Exception as e: @@ -506,16 +524,14 @@ class VideoArchiver(GroupCog): self._unloading = False # Ensure ready flag is cleared self.ready.clear() - # Aggressively clear all references + # Clear all references self.bot = None self.processor = None self.queue_manager = None self.update_checker = None self.ffmpeg_mgr = None - if hasattr(self, 'components'): - self.components.clear() + self.components.clear() self.db = None - # Clear any other potential references self._init_task = None self._cleanup_task = None @@ -526,6 +542,7 @@ class VideoArchiver(GroupCog): cleanup_resources(self), timeout=CLEANUP_TIMEOUT ) + logger.info("Cleanup completed successfully") except asyncio.TimeoutError: logger.warning("Cleanup timed out, forcing cleanup") try: @@ -533,5 +550,6 @@ class VideoArchiver(GroupCog): force_cleanup_resources(self), timeout=CLEANUP_TIMEOUT ) + logger.info("Force cleanup completed") except asyncio.TimeoutError: logger.error("Force cleanup timed out") diff --git a/videoarchiver/processor/core.py b/videoarchiver/processor/core.py index 0484e83..948ad58 100644 --- a/videoarchiver/processor/core.py +++ b/videoarchiver/processor/core.py @@ -43,14 +43,10 @@ class VideoProcessor: if self.db: self.queue_handler.db = self.db - # Start queue processing - logger.info("Starting video processing queue...") + # Store queue task reference but don't start processing here + # Queue processing is managed by VideoArchiver class self._queue_task = None - if queue_manager: - self._queue_task = self.bot.loop.create_task( - queue_manager.process_queue(self.queue_handler.process_video) - ) - logger.info("Video processing queue started successfully") + logger.info("VideoProcessor initialized successfully") async def process_message(self, message: discord.Message) -> None: """Process a message for video content""" @@ -74,7 +70,7 @@ class VideoProcessor: except Exception as e: logger.error(f"Error cleaning up FFmpeg manager: {e}") - # Cancel queue processing task + # Cancel queue processing task if we have one if self._queue_task and not self._queue_task.done(): self._queue_task.cancel() try: diff --git a/videoarchiver/processor/queue_handler.py b/videoarchiver/processor/queue_handler.py index c9abe73..7c9a445 100644 --- a/videoarchiver/processor/queue_handler.py +++ b/videoarchiver/processor/queue_handler.py @@ -35,6 +35,10 @@ class QueueHandler: download_task = None try: + # Start processing + item.start_processing() + logger.info(f"Started processing video: {item.url}") + # Check if video is already archived if self.db and self.db.is_url_archived(item.url): logger.info(f"Video already archived: {item.url}") @@ -43,18 +47,23 @@ class QueueHandler: archived_info = self.db.get_archived_video(item.url) if archived_info: await original_message.reply(f"This video was already archived. You can find it here: {archived_info[0]}") + item.finish_processing(True) return True, None guild_id = item.guild_id if guild_id not in self.components: - return False, f"No components found for guild {guild_id}" + error = f"No components found for guild {guild_id}" + item.finish_processing(False, error) + return False, error components = self.components[guild_id] downloader = components.get("downloader") message_manager = components.get("message_manager") if not downloader or not message_manager: - return False, f"Missing required components for guild {guild_id}" + error = f"Missing required components for guild {guild_id}" + item.finish_processing(False, error) + return False, error # Get original message and update reactions original_message = await self._get_original_message(item) @@ -74,19 +83,21 @@ class QueueHandler: if original_message: await original_message.add_reaction(REACTIONS["error"]) logger.error(f"Download failed for message {item.message_id}: {error}") + item.finish_processing(False, f"Failed to download video: {error}") return False, f"Failed to download video: {error}" # Archive video success, error = await self._archive_video( guild_id, original_message, message_manager, item.url, file_path ) - if not success: - return False, error - - return True, None + + # Finish processing + item.finish_processing(success, error if not success else None) + return success, error except Exception as e: logger.error(f"Error processing video: {str(e)}", exc_info=True) + item.finish_processing(False, str(e)) return False, str(e) finally: # Clean up downloaded file diff --git a/videoarchiver/queue/manager.py b/videoarchiver/queue/manager.py index e4f4fb1..f3bb7ad 100644 --- a/videoarchiver/queue/manager.py +++ b/videoarchiver/queue/manager.py @@ -2,6 +2,7 @@ import asyncio import logging +import time from typing import Dict, Optional, Set, Tuple, Callable, Any, List from datetime import datetime @@ -28,7 +29,8 @@ class EnhancedVideoQueueManager: max_history_age: int = 86400, # 24 hours persistence_path: Optional[str] = None, backup_interval: int = 300, # 5 minutes - deadlock_threshold: int = 900, # 15 minutes + deadlock_threshold: int = 300, # 5 minutes + check_interval: int = 60, # 1 minute ): # Configuration self.max_retries = max_retries @@ -58,7 +60,8 @@ class EnhancedVideoQueueManager: self.persistence = QueuePersistenceManager(persistence_path) if persistence_path else None self.monitor = QueueMonitor( deadlock_threshold=deadlock_threshold, - max_retries=max_retries + max_retries=max_retries, + check_interval=check_interval ) self.cleaner = QueueCleaner( cleanup_interval=cleanup_interval, @@ -80,6 +83,7 @@ class EnhancedVideoQueueManager: ) ) self._active_tasks.add(monitor_task) + logger.info("Queue monitoring started") # Start cleanup cleanup_task = asyncio.create_task( @@ -95,6 +99,7 @@ class EnhancedVideoQueueManager: ) ) self._active_tasks.add(cleanup_task) + logger.info("Queue cleanup started") # Load persisted state if available if self.persistence: @@ -120,6 +125,7 @@ class EnhancedVideoQueueManager: self.metrics.compression_failures = metrics_data.get("compression_failures", 0) self.metrics.hardware_accel_failures = metrics_data.get("hardware_accel_failures", 0) + logger.info("Loaded persisted queue state") except Exception as e: logger.error(f"Failed to load persisted state: {e}") @@ -141,8 +147,6 @@ class EnhancedVideoQueueManager: if self._queue: item = self._queue.pop(0) self._processing[item.url] = item - item.status = "processing" - item.processing_time = 0.0 if not item: await asyncio.sleep(1) @@ -151,20 +155,19 @@ class EnhancedVideoQueueManager: try: # Process the item logger.info(f"Processing queue item: {item.url}") + item.start_processing() # Start processing tracking + self.metrics.last_activity_time = time.time() # Update activity time + success, error = await processor(item) # Update metrics and status async with self._processing_lock: + item.finish_processing(success, error) # Update item status + if success: - item.status = "completed" self._completed[item.url] = item logger.info(f"Successfully processed: {item.url}") else: - item.status = "failed" - item.error = error - item.last_error = error - item.last_error_time = datetime.utcnow() - if item.retry_count < self.max_retries: item.retry_count += 1 item.status = "pending" @@ -177,16 +180,25 @@ class EnhancedVideoQueueManager: logger.error(f"Failed after {self.max_retries} attempts: {item.url}") self._processing.pop(item.url, None) + + # Update metrics + self.metrics.update( + processing_time=item.processing_time, + success=success, + error=error + ) except Exception as e: logger.error(f"Error processing {item.url}: {e}") async with self._processing_lock: - item.status = "failed" - item.error = str(e) - item.last_error = str(e) - item.last_error_time = datetime.utcnow() + item.finish_processing(False, str(e)) self._failed[item.url] = item self._processing.pop(item.url, None) + self.metrics.update( + processing_time=item.processing_time, + success=False, + error=str(e) + ) # Persist state if enabled if self.persistence: @@ -215,22 +227,7 @@ class EnhancedVideoQueueManager: author_id: int, priority: int = 0, ) -> bool: - """Add a video to the processing queue - - Args: - url: Video URL - message_id: Discord message ID - channel_id: Discord channel ID - guild_id: Discord guild ID - author_id: Discord author ID - priority: Queue priority (higher = higher priority) - - Returns: - True if added successfully - - Raises: - QueueError: If queue is full or shutting down - """ + """Add a video to the processing queue""" if self._shutdown: raise QueueError("Queue manager is shutting down") @@ -262,6 +259,9 @@ class EnhancedVideoQueueManager: self._queue.append(item) self._queue.sort(key=lambda x: (-x.priority, x.added_at)) + # Update activity time + self.metrics.last_activity_time = time.time() + if self.persistence: await self.persistence.persist_queue_state( self._queue, @@ -279,14 +279,7 @@ class EnhancedVideoQueueManager: raise QueueError(f"Failed to add to queue: {str(e)}") def get_queue_status(self, guild_id: int) -> dict: - """Get current queue status for a guild - - Args: - guild_id: Discord guild ID - - Returns: - Dict containing queue status and metrics - """ + """Get current queue status for a guild""" try: pending = len([item for item in self._queue if item.guild_id == guild_id]) processing = len([item for item in self._processing.values() if item.guild_id == guild_id]) @@ -308,6 +301,7 @@ class EnhancedVideoQueueManager: "errors_by_type": self.metrics.errors_by_type, "compression_failures": self.metrics.compression_failures, "hardware_accel_failures": self.metrics.hardware_accel_failures, + "last_activity": time.time() - self.metrics.last_activity_time, }, } @@ -328,21 +322,12 @@ class EnhancedVideoQueueManager: "errors_by_type": {}, "compression_failures": 0, "hardware_accel_failures": 0, + "last_activity": 0, }, } async def clear_guild_queue(self, guild_id: int) -> int: - """Clear all queue items for a guild - - Args: - guild_id: Discord guild ID - - Returns: - Number of items cleared - - Raises: - QueueError: If queue is shutting down - """ + """Clear all queue items for a guild""" if self._shutdown: raise QueueError("Queue manager is shutting down") @@ -377,6 +362,7 @@ class EnhancedVideoQueueManager: """Clean up resources and stop queue processing""" try: self._shutdown = True + logger.info("Starting queue manager cleanup...") # Stop monitoring and cleanup tasks self.monitor.stop_monitoring() @@ -428,6 +414,7 @@ class EnhancedVideoQueueManager: def force_stop(self) -> None: """Force stop all queue operations immediately""" self._shutdown = True + logger.info("Force stopping queue manager...") # Stop monitoring and cleanup self.monitor.stop_monitoring() diff --git a/videoarchiver/queue/models.py b/videoarchiver/queue/models.py index 95d5463..3d64349 100644 --- a/videoarchiver/queue/models.py +++ b/videoarchiver/queue/models.py @@ -1,6 +1,7 @@ """Data models for the queue system""" import logging +import time from dataclasses import dataclass, field, asdict from datetime import datetime from typing import Dict, Optional, List, Any @@ -22,14 +23,16 @@ class QueueItem: guild_id: int # Discord ID added_at: datetime = field(default_factory=datetime.utcnow) status: str = "pending" - retry_count: int = 0 # Changed from retries to retry_count - priority: int = 0 # Added priority field with default value 0 + retry_count: int = 0 + priority: int = 0 last_retry: Optional[datetime] = None last_error: Optional[str] = None last_error_time: Optional[datetime] = None + start_time: Optional[float] = None # Added start_time for processing tracking processing_time: float = 0.0 output_path: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) + error: Optional[str] = None # Added error field for current error def __post_init__(self): """Convert string dates to datetime objects after initialization""" @@ -57,6 +60,29 @@ class QueueItem: elif not isinstance(self.last_error_time, datetime): self.last_error_time = None + def start_processing(self) -> None: + """Mark item as started processing""" + self.status = "processing" + self.start_time = time.time() + self.processing_time = 0.0 + self.error = None + + def finish_processing(self, success: bool, error: Optional[str] = None) -> None: + """Mark item as finished processing""" + end_time = time.time() + if self.start_time: + self.processing_time = end_time - self.start_time + + if success: + self.status = "completed" + else: + self.status = "failed" + self.error = error + self.last_error = error + self.last_error_time = datetime.utcnow() + + self.start_time = None + def to_dict(self) -> dict: """Convert to dictionary with datetime handling""" data = asdict(self) @@ -91,6 +117,7 @@ class QueueMetrics: processing_times: List[float] = field(default_factory=list) compression_failures: int = 0 hardware_accel_failures: int = 0 + last_activity_time: float = field(default_factory=time.time) # Added activity tracking def __post_init__(self): """Convert string dates to datetime objects after initialization""" @@ -115,6 +142,8 @@ class QueueMetrics: def update(self, processing_time: float, success: bool, error: str = None): """Update metrics with new processing information""" self.total_processed += 1 + self.last_activity_time = time.time() # Update activity timestamp + if not success: self.total_failed += 1 if error: diff --git a/videoarchiver/queue/monitoring.py b/videoarchiver/queue/monitoring.py index 13660cd..c67fdbe 100644 --- a/videoarchiver/queue/monitoring.py +++ b/videoarchiver/queue/monitoring.py @@ -19,14 +19,17 @@ class QueueMonitor: def __init__( self, - deadlock_threshold: int = 900, # 15 minutes - memory_threshold: int = 1024, # 1GB - max_retries: int = 3 + deadlock_threshold: int = 300, # 5 minutes + memory_threshold: int = 512, # 512MB + max_retries: int = 3, + check_interval: int = 60 # Check every minute ): self.deadlock_threshold = deadlock_threshold self.memory_threshold = memory_threshold self.max_retries = max_retries + self.check_interval = check_interval self._shutdown = False + self._last_active_time = time.time() async def start_monitoring( self, @@ -43,21 +46,28 @@ class QueueMonitor: metrics: Reference to queue metrics processing_lock: Lock for processing dict """ + logger.info("Starting queue monitoring...") while not self._shutdown: try: await self._check_health(queue, processing, metrics, processing_lock) - await asyncio.sleep(300) # Check every 5 minutes + await asyncio.sleep(self.check_interval) except asyncio.CancelledError: + logger.info("Queue monitoring cancelled") break except Exception as e: logger.error(f"Error in health monitor: {str(e)}") - await asyncio.sleep(60) + await asyncio.sleep(30) # Shorter sleep on error def stop_monitoring(self) -> None: """Stop the monitoring process""" + logger.info("Stopping queue monitoring...") self._shutdown = True + def update_activity(self) -> None: + """Update the last active time""" + self._last_active_time = time.time() + async def _check_health( self, queue: List[QueueItem], @@ -74,6 +84,8 @@ class QueueMonitor: processing_lock: Lock for processing dict """ try: + current_time = time.time() + # Check memory usage process = psutil.Process() memory_usage = process.memory_info().rss / 1024 / 1024 # MB @@ -83,18 +95,22 @@ class QueueMonitor: # Force garbage collection import gc gc.collect() + memory_after = process.memory_info().rss / 1024 / 1024 + logger.info(f"Memory after GC: {memory_after:.2f}MB") # Check for potential deadlocks - current_time = time.time() processing_times = [] stuck_items = [] - for url, item in processing.items(): - if isinstance(item.processing_time, (int, float)) and item.processing_time > 0: - processing_time = current_time - item.processing_time - processing_times.append(processing_time) - if processing_time > self.deadlock_threshold: - stuck_items.append((url, item)) + async with processing_lock: + for url, item in processing.items(): + # Check if item has started processing + if hasattr(item, 'start_time') and item.start_time: + processing_time = current_time - item.start_time + processing_times.append(processing_time) + if processing_time > self.deadlock_threshold: + stuck_items.append((url, item)) + logger.warning(f"Item stuck in processing: {url} for {processing_time:.1f}s") if stuck_items: logger.warning( @@ -104,6 +120,17 @@ class QueueMonitor: stuck_items, queue, processing, processing_lock ) + # Check overall queue activity + if processing and current_time - self._last_active_time > self.deadlock_threshold: + logger.warning("Queue appears to be hung - no activity detected") + # Force recovery of all processing items + async with processing_lock: + all_items = list(processing.items()) + await self._recover_stuck_items( + all_items, queue, processing, processing_lock + ) + self._last_active_time = current_time + # Calculate and log metrics success_rate = metrics.success_rate error_distribution = metrics.errors_by_type @@ -112,14 +139,17 @@ class QueueMonitor: # Update peak memory usage metrics.peak_memory_usage = max(metrics.peak_memory_usage, memory_usage) + # Log detailed metrics logger.info( f"Queue Health Metrics:\n" f"- Success Rate: {success_rate:.2%}\n" f"- Avg Processing Time: {avg_processing_time:.2f}s\n" f"- Memory Usage: {memory_usage:.2f}MB\n" + f"- Peak Memory: {metrics.peak_memory_usage:.2f}MB\n" f"- Error Distribution: {error_distribution}\n" f"- Queue Size: {len(queue)}\n" - f"- Processing Items: {len(processing)}" + f"- Processing Items: {len(processing)}\n" + f"- Last Activity: {(current_time - self._last_active_time):.1f}s ago" ) except Exception as e: @@ -142,26 +172,37 @@ class QueueMonitor: processing_lock: Lock for processing dict """ try: + recovered = 0 + failed = 0 + async with processing_lock: for url, item in stuck_items: - # Move to failed if max retries reached - if item.retry_count >= self.max_retries: - logger.warning(f"Moving stuck item to failed: {url}") - item.status = "failed" - item.error = "Exceeded maximum retries after being stuck" - item.last_error = item.error - item.last_error_time = datetime.utcnow() - processing.pop(url) - else: - # Reset for retry - logger.info(f"Recovering stuck item for retry: {url}") - item.retry_count += 1 - item.processing_time = 0 - item.last_retry = datetime.utcnow() - item.status = "pending" - item.priority = max(0, item.priority - 2) # Lower priority - queue.append(item) - processing.pop(url) + try: + # Move to failed if max retries reached + if item.retry_count >= self.max_retries: + logger.warning(f"Moving stuck item to failed: {url}") + item.status = "failed" + item.error = "Exceeded maximum retries after being stuck" + item.last_error = item.error + item.last_error_time = datetime.utcnow() + processing.pop(url) + failed += 1 + else: + # Reset for retry + logger.info(f"Recovering stuck item for retry: {url}") + item.retry_count += 1 + item.start_time = None + item.processing_time = 0 + item.last_retry = datetime.utcnow() + item.status = "pending" + item.priority = max(0, item.priority - 2) # Lower priority + queue.append(item) + processing.pop(url) + recovered += 1 + except Exception as e: + logger.error(f"Error recovering item {url}: {str(e)}") + + logger.info(f"Recovery complete - Recovered: {recovered}, Failed: {failed}") except Exception as e: logger.error(f"Error recovering stuck items: {str(e)}")