From 2bf85b532bc3838be211ac7a08042d4f15c8ec02 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Fri, 15 Nov 2024 17:35:14 +0000 Subject: [PATCH] Adding robust datetime handling in _periodic_cleanup: Added try-except blocks around individual item processing Added proper string to datetime conversion with fallbacks Added cleanup of problematic items that can't be processed Improved error handling: Added specific error logging for individual item failures Added graceful removal of problematic items Ensured cleanup continues even if some items fail Added fallback mechanisms: Uses current_time as fallback for invalid datetime values Removes items that can't be properly processed Prevents single item failures from breaking the entire cleanup process --- videoarchiver/enhanced_queue.py | 150 +++++++++++++++++++++++++------- 1 file changed, 119 insertions(+), 31 deletions(-) diff --git a/videoarchiver/enhanced_queue.py b/videoarchiver/enhanced_queue.py index 5ad04b2..48aac5e 100644 --- a/videoarchiver/enhanced_queue.py +++ b/videoarchiver/enhanced_queue.py @@ -515,31 +515,95 @@ class EnhancedVideoQueueManager: with open(self.persistence_path, "r") as f: state = json.load(f) - # Restore queue items with proper datetime conversion - self._queue = [QueueItem.from_dict(item) for item in state["queue"]] - self._processing = {k: QueueItem.from_dict(v) for k, v in state["processing"].items()} - self._completed = {k: QueueItem.from_dict(v) for k, v in state["completed"].items()} - self._failed = {k: QueueItem.from_dict(v) for k, v in state["failed"].items()} + # Helper function to safely convert items + def safe_convert_item(item_data): + try: + if isinstance(item_data, dict): + # Ensure datetime fields are properly formatted + if 'added_at' in item_data and item_data['added_at']: + if isinstance(item_data['added_at'], str): + try: + item_data['added_at'] = datetime.fromisoformat(item_data['added_at']) + except ValueError: + item_data['added_at'] = datetime.utcnow() + elif not isinstance(item_data['added_at'], datetime): + item_data['added_at'] = datetime.utcnow() - # Restore metrics - metrics_data = state["metrics"] - self.metrics.total_processed = metrics_data["total_processed"] - self.metrics.total_failed = metrics_data["total_failed"] - self.metrics.avg_processing_time = metrics_data["avg_processing_time"] - self.metrics.success_rate = metrics_data["success_rate"] - self.metrics.errors_by_type = metrics_data["errors_by_type"] - self.metrics.last_error = metrics_data["last_error"] - self.metrics.compression_failures = metrics_data.get( - "compression_failures", 0 - ) - self.metrics.hardware_accel_failures = metrics_data.get( - "hardware_accel_failures", 0 - ) + if 'last_retry' in item_data and item_data['last_retry']: + if isinstance(item_data['last_retry'], str): + try: + item_data['last_retry'] = datetime.fromisoformat(item_data['last_retry']) + except ValueError: + item_data['last_retry'] = None + elif not isinstance(item_data['last_retry'], datetime): + item_data['last_retry'] = None - if metrics_data["last_error_time"]: - self.metrics.last_error_time = datetime.fromisoformat( - metrics_data["last_error_time"] - ) + if 'last_error_time' in item_data and item_data['last_error_time']: + if isinstance(item_data['last_error_time'], str): + try: + item_data['last_error_time'] = datetime.fromisoformat(item_data['last_error_time']) + except ValueError: + item_data['last_error_time'] = None + elif not isinstance(item_data['last_error_time'], datetime): + item_data['last_error_time'] = None + + return QueueItem(**item_data) + return None + except Exception as e: + logger.error(f"Error converting queue item: {e}") + return None + + # Restore queue items with proper conversion + self._queue = [] + for item in state.get("queue", []): + converted_item = safe_convert_item(item) + if converted_item: + self._queue.append(converted_item) + + # Restore processing items + self._processing = {} + for k, v in state.get("processing", {}).items(): + converted_item = safe_convert_item(v) + if converted_item: + self._processing[k] = converted_item + + # Restore completed items + self._completed = {} + for k, v in state.get("completed", {}).items(): + converted_item = safe_convert_item(v) + if converted_item: + self._completed[k] = converted_item + + # Restore failed items + self._failed = {} + for k, v in state.get("failed", {}).items(): + converted_item = safe_convert_item(v) + if converted_item: + self._failed[k] = converted_item + + # Restore metrics with proper datetime handling + metrics_data = state.get("metrics", {}) + self.metrics.total_processed = metrics_data.get("total_processed", 0) + self.metrics.total_failed = metrics_data.get("total_failed", 0) + self.metrics.avg_processing_time = metrics_data.get("avg_processing_time", 0.0) + self.metrics.success_rate = metrics_data.get("success_rate", 0.0) + self.metrics.errors_by_type = metrics_data.get("errors_by_type", {}) + self.metrics.last_error = metrics_data.get("last_error") + self.metrics.compression_failures = metrics_data.get("compression_failures", 0) + self.metrics.hardware_accel_failures = metrics_data.get("hardware_accel_failures", 0) + + # Handle metrics datetime fields + last_error_time = metrics_data.get("last_error_time") + if last_error_time: + try: + if isinstance(last_error_time, str): + self.metrics.last_error_time = datetime.fromisoformat(last_error_time) + elif isinstance(last_error_time, datetime): + self.metrics.last_error_time = last_error_time + else: + self.metrics.last_error_time = None + except ValueError: + self.metrics.last_error_time = None logger.info("Successfully loaded persisted queue state") @@ -655,19 +719,43 @@ class EnhancedVideoQueueManager: # Clean up completed items for url in list(self._completed.keys()): item = self._completed[url] - # Ensure added_at is a datetime object - if isinstance(item.added_at, str): - item.added_at = datetime.fromisoformat(item.added_at) - if item.added_at < cleanup_cutoff: + try: + # Ensure added_at is a datetime object + if isinstance(item.added_at, str): + try: + item.added_at = datetime.fromisoformat(item.added_at) + except ValueError: + # If conversion fails, use current time to ensure item gets cleaned up + item.added_at = current_time + elif not isinstance(item.added_at, datetime): + item.added_at = current_time + + if item.added_at < cleanup_cutoff: + self._completed.pop(url) + except Exception as e: + logger.error(f"Error processing completed item {url}: {e}") + # Remove problematic item self._completed.pop(url) # Clean up failed items for url in list(self._failed.keys()): item = self._failed[url] - # Ensure added_at is a datetime object - if isinstance(item.added_at, str): - item.added_at = datetime.fromisoformat(item.added_at) - if item.added_at < cleanup_cutoff: + try: + # Ensure added_at is a datetime object + if isinstance(item.added_at, str): + try: + item.added_at = datetime.fromisoformat(item.added_at) + except ValueError: + # If conversion fails, use current time to ensure item gets cleaned up + item.added_at = current_time + elif not isinstance(item.added_at, datetime): + item.added_at = current_time + + if item.added_at < cleanup_cutoff: + self._failed.pop(url) + except Exception as e: + logger.error(f"Error processing failed item {url}: {e}") + # Remove problematic item self._failed.pop(url) # Clean up guild and channel tracking