Adding robust datetime handling in _periodic_cleanup:

Added try-except blocks around individual item processing
Added proper string to datetime conversion with fallbacks
Added cleanup of problematic items that can't be processed
Improved error handling:

Added specific error logging for individual item failures
Added graceful removal of problematic items
Ensured cleanup continues even if some items fail
Added fallback mechanisms:

Uses current_time as fallback for invalid datetime values
Removes items that can't be properly processed
Prevents single item failures from breaking the entire cleanup process
This commit is contained in:
pacnpal
2024-11-15 17:35:14 +00:00
parent 39169b3edb
commit 2bf85b532b

View File

@@ -515,31 +515,95 @@ class EnhancedVideoQueueManager:
with open(self.persistence_path, "r") as f:
state = json.load(f)
# Restore queue items with proper datetime conversion
self._queue = [QueueItem.from_dict(item) for item in state["queue"]]
self._processing = {k: QueueItem.from_dict(v) for k, v in state["processing"].items()}
self._completed = {k: QueueItem.from_dict(v) for k, v in state["completed"].items()}
self._failed = {k: QueueItem.from_dict(v) for k, v in state["failed"].items()}
# Helper function to safely convert items
def safe_convert_item(item_data):
try:
if isinstance(item_data, dict):
# Ensure datetime fields are properly formatted
if 'added_at' in item_data and item_data['added_at']:
if isinstance(item_data['added_at'], str):
try:
item_data['added_at'] = datetime.fromisoformat(item_data['added_at'])
except ValueError:
item_data['added_at'] = datetime.utcnow()
elif not isinstance(item_data['added_at'], datetime):
item_data['added_at'] = datetime.utcnow()
# Restore metrics
metrics_data = state["metrics"]
self.metrics.total_processed = metrics_data["total_processed"]
self.metrics.total_failed = metrics_data["total_failed"]
self.metrics.avg_processing_time = metrics_data["avg_processing_time"]
self.metrics.success_rate = metrics_data["success_rate"]
self.metrics.errors_by_type = metrics_data["errors_by_type"]
self.metrics.last_error = metrics_data["last_error"]
self.metrics.compression_failures = metrics_data.get(
"compression_failures", 0
)
self.metrics.hardware_accel_failures = metrics_data.get(
"hardware_accel_failures", 0
)
if 'last_retry' in item_data and item_data['last_retry']:
if isinstance(item_data['last_retry'], str):
try:
item_data['last_retry'] = datetime.fromisoformat(item_data['last_retry'])
except ValueError:
item_data['last_retry'] = None
elif not isinstance(item_data['last_retry'], datetime):
item_data['last_retry'] = None
if metrics_data["last_error_time"]:
self.metrics.last_error_time = datetime.fromisoformat(
metrics_data["last_error_time"]
)
if 'last_error_time' in item_data and item_data['last_error_time']:
if isinstance(item_data['last_error_time'], str):
try:
item_data['last_error_time'] = datetime.fromisoformat(item_data['last_error_time'])
except ValueError:
item_data['last_error_time'] = None
elif not isinstance(item_data['last_error_time'], datetime):
item_data['last_error_time'] = None
return QueueItem(**item_data)
return None
except Exception as e:
logger.error(f"Error converting queue item: {e}")
return None
# Restore queue items with proper conversion
self._queue = []
for item in state.get("queue", []):
converted_item = safe_convert_item(item)
if converted_item:
self._queue.append(converted_item)
# Restore processing items
self._processing = {}
for k, v in state.get("processing", {}).items():
converted_item = safe_convert_item(v)
if converted_item:
self._processing[k] = converted_item
# Restore completed items
self._completed = {}
for k, v in state.get("completed", {}).items():
converted_item = safe_convert_item(v)
if converted_item:
self._completed[k] = converted_item
# Restore failed items
self._failed = {}
for k, v in state.get("failed", {}).items():
converted_item = safe_convert_item(v)
if converted_item:
self._failed[k] = converted_item
# Restore metrics with proper datetime handling
metrics_data = state.get("metrics", {})
self.metrics.total_processed = metrics_data.get("total_processed", 0)
self.metrics.total_failed = metrics_data.get("total_failed", 0)
self.metrics.avg_processing_time = metrics_data.get("avg_processing_time", 0.0)
self.metrics.success_rate = metrics_data.get("success_rate", 0.0)
self.metrics.errors_by_type = metrics_data.get("errors_by_type", {})
self.metrics.last_error = metrics_data.get("last_error")
self.metrics.compression_failures = metrics_data.get("compression_failures", 0)
self.metrics.hardware_accel_failures = metrics_data.get("hardware_accel_failures", 0)
# Handle metrics datetime fields
last_error_time = metrics_data.get("last_error_time")
if last_error_time:
try:
if isinstance(last_error_time, str):
self.metrics.last_error_time = datetime.fromisoformat(last_error_time)
elif isinstance(last_error_time, datetime):
self.metrics.last_error_time = last_error_time
else:
self.metrics.last_error_time = None
except ValueError:
self.metrics.last_error_time = None
logger.info("Successfully loaded persisted queue state")
@@ -655,19 +719,43 @@ class EnhancedVideoQueueManager:
# Clean up completed items
for url in list(self._completed.keys()):
item = self._completed[url]
# Ensure added_at is a datetime object
if isinstance(item.added_at, str):
item.added_at = datetime.fromisoformat(item.added_at)
if item.added_at < cleanup_cutoff:
try:
# Ensure added_at is a datetime object
if isinstance(item.added_at, str):
try:
item.added_at = datetime.fromisoformat(item.added_at)
except ValueError:
# If conversion fails, use current time to ensure item gets cleaned up
item.added_at = current_time
elif not isinstance(item.added_at, datetime):
item.added_at = current_time
if item.added_at < cleanup_cutoff:
self._completed.pop(url)
except Exception as e:
logger.error(f"Error processing completed item {url}: {e}")
# Remove problematic item
self._completed.pop(url)
# Clean up failed items
for url in list(self._failed.keys()):
item = self._failed[url]
# Ensure added_at is a datetime object
if isinstance(item.added_at, str):
item.added_at = datetime.fromisoformat(item.added_at)
if item.added_at < cleanup_cutoff:
try:
# Ensure added_at is a datetime object
if isinstance(item.added_at, str):
try:
item.added_at = datetime.fromisoformat(item.added_at)
except ValueError:
# If conversion fails, use current time to ensure item gets cleaned up
item.added_at = current_time
elif not isinstance(item.added_at, datetime):
item.added_at = current_time
if item.added_at < cleanup_cutoff:
self._failed.pop(url)
except Exception as e:
logger.error(f"Error processing failed item {url}: {e}")
# Remove problematic item
self._failed.pop(url)
# Clean up guild and channel tracking