fix imports

This commit is contained in:
pacnpal
2024-11-15 00:02:09 +00:00
parent 2160be84bb
commit fc40e994fe

View File

@@ -4,17 +4,12 @@ import logging
import json
import os
import time
import psutil
from typing import Dict, Optional, Set, Tuple, Callable, Any, List, Union
from datetime import datetime, timedelta
import traceback
from dataclasses import dataclass, asdict, field
import weakref
from pathlib import Path
import aiofiles
import aiofiles.os
import sys
import signal
from concurrent.futures import ThreadPoolExecutor
from functools import partial
import tempfile
@@ -41,6 +36,11 @@ class QueueMetrics:
total_failed: int = 0
avg_processing_time: float = 0.0
success_rate: float = 0.0
errors_by_type: Dict[str, int] = field(default_factory=dict)
last_error: Optional[str] = None
last_error_time: Optional[datetime] = None
last_cleanup: datetime = field(default_factory=datetime.utcnow)
retries: int = 0
peak_memory_usage: float = 0.0
errors_by_type: Dict[str, int] = field(default_factory=dict)
last_error: Optional[str] = None
@@ -57,30 +57,39 @@ class QueueMetrics:
if error:
self.last_error = error
self.last_error_time = datetime.utcnow()
error_type = error.split(':')[0] if ':' in error else error
self.errors_by_type[error_type] = self.errors_by_type.get(error_type, 0) + 1
error_type = error.split(":")[0] if ":" in error else error
self.errors_by_type[error_type] = (
self.errors_by_type.get(error_type, 0) + 1
)
# Update processing times with sliding window
self.processing_times.append(processing_time)
if len(self.processing_times) > 100: # Keep last 100 processing times
self.processing_times.pop(0)
# Update average processing time
self.avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0.0
self.avg_processing_time = (
sum(self.processing_times) / len(self.processing_times)
if self.processing_times
else 0.0
)
# Update success rate
self.success_rate = (
(self.total_processed - self.total_failed) / self.total_processed
if self.total_processed > 0 else 0.0
if self.total_processed > 0
else 0.0
)
# Update peak memory usage
current_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
self.peak_memory_usage = max(self.peak_memory_usage, current_memory)
@dataclass
class QueueItem:
"""Represents a video processing task in the queue"""
url: str
message_id: int
channel_id: int
@@ -99,9 +108,10 @@ class QueueItem:
processing_times: List[float] = field(default_factory=list)
last_error_time: Optional[datetime] = None
class EnhancedVideoQueueManager:
"""Enhanced queue manager with improved memory management and performance"""
def __init__(
self,
max_retries: int = 3,
@@ -110,7 +120,7 @@ class EnhancedVideoQueueManager:
cleanup_interval: int = 3600, # 1 hour
max_history_age: int = 86400, # 24 hours
persistence_path: Optional[str] = None,
backup_interval: int = 300 # 5 minutes
backup_interval: int = 300, # 5 minutes
):
self.max_retries = max_retries
self.retry_delay = retry_delay
@@ -119,29 +129,29 @@ class EnhancedVideoQueueManager:
self.max_history_age = max_history_age
self.persistence_path = persistence_path
self.backup_interval = backup_interval
# Queue storage with priority
self._queue: List[QueueItem] = []
self._queue_lock = asyncio.Lock()
self._processing: Dict[str, QueueItem] = {}
self._completed: Dict[str, QueueItem] = {}
self._failed: Dict[str, QueueItem] = {}
# Track active tasks
self._active_tasks: Set[asyncio.Task] = set()
self._processing_lock = asyncio.Lock()
# Status tracking
self._guild_queues: Dict[int, Set[str]] = {}
self._channel_queues: Dict[int, Set[str]] = {}
# Metrics tracking
self.metrics = QueueMetrics()
# Recovery tracking
self._recovery_attempts: Dict[str, int] = {}
self._last_backup: Optional[datetime] = None
# Initialize tasks
self._init_tasks()
@@ -150,16 +160,16 @@ class EnhancedVideoQueueManager:
# Cleanup and monitoring
self._cleanup_task = asyncio.create_task(self._periodic_cleanup())
self._active_tasks.add(self._cleanup_task)
# Health monitoring
self._health_check_task = asyncio.create_task(self._monitor_health())
self._active_tasks.add(self._health_check_task)
# Backup task
if self.persistence_path:
self._backup_task = asyncio.create_task(self._periodic_backup())
self._active_tasks.add(self._backup_task)
# Load persisted queue
self._load_persisted_queue()
@@ -171,18 +181,18 @@ class EnhancedVideoQueueManager:
guild_id: int,
author_id: int,
callback: Callable[[str, bool, str], Any],
priority: int = 0
priority: int = 0,
) -> bool:
"""Add a video to the processing queue with priority support"""
try:
async with self._queue_lock:
if len(self._queue) >= self.max_queue_size:
raise QueueError("Queue is full")
# Check system resources
if psutil.virtual_memory().percent > 90:
raise ResourceExhaustedError("System memory is critically low")
# Create queue item
item = QueueItem(
url=url,
@@ -191,36 +201,38 @@ class EnhancedVideoQueueManager:
guild_id=guild_id,
author_id=author_id,
added_at=datetime.utcnow(),
priority=priority
priority=priority,
)
# Add to tracking collections
if guild_id not in self._guild_queues:
self._guild_queues[guild_id] = set()
self._guild_queues[guild_id].add(url)
if channel_id not in self._channel_queues:
self._channel_queues[channel_id] = set()
self._channel_queues[channel_id].add(url)
# Add to queue with priority
self._queue.append(item)
self._queue.sort(key=lambda x: (-x.priority, x.added_at))
# Persist queue state
if self.persistence_path:
await self._persist_queue()
logger.info(f"Added video to queue: {url} with priority {priority}")
return True
except Exception as e:
logger.error(f"Error adding video to queue: {traceback.format_exc()}")
raise QueueError(f"Failed to add to queue: {str(e)}")
async def process_queue(self, processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]):
async def process_queue(
self, processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]
):
"""Process items in the queue with the provided processor function
Args:
processor: A callable that takes a QueueItem and returns a tuple of (success: bool, error: Optional[str])
"""
@@ -234,20 +246,20 @@ class EnhancedVideoQueueManager:
self._processing[item.url] = item
item.status = "processing"
item.processing_time = time.time()
if not item:
await asyncio.sleep(1)
continue
try:
# Process the item
start_time = time.time()
success, error = await processor(item)
processing_time = time.time() - start_time
# Update metrics
self.metrics.update(processing_time, success, error)
# Update item status
async with self._processing_lock:
if success:
@@ -259,22 +271,28 @@ class EnhancedVideoQueueManager:
item.error = error
item.last_error = error
item.last_error_time = datetime.utcnow()
# Handle retries
if item.retry_count < self.max_retries:
item.retry_count += 1
item.status = "pending"
item.last_retry = datetime.utcnow()
self._queue.append(item)
logger.warning(f"Retrying item: {item.url} (attempt {item.retry_count})")
logger.warning(
f"Retrying item: {item.url} (attempt {item.retry_count})"
)
else:
self._failed[item.url] = item
logger.error(f"Failed to process item after {self.max_retries} attempts: {item.url}")
logger.error(
f"Failed to process item after {self.max_retries} attempts: {item.url}"
)
self._processing.pop(item.url, None)
except Exception as e:
logger.error(f"Error processing item {item.url}: {traceback.format_exc()}")
logger.error(
f"Error processing item {item.url}: {traceback.format_exc()}"
)
async with self._processing_lock:
item.status = "failed"
item.error = str(e)
@@ -282,15 +300,15 @@ class EnhancedVideoQueueManager:
item.last_error_time = datetime.utcnow()
self._failed[item.url] = item
self._processing.pop(item.url, None)
# Persist state after processing
if self.persistence_path:
await self._persist_queue()
except Exception as e:
logger.error(f"Error in queue processor: {traceback.format_exc()}")
await asyncio.sleep(1)
# Small delay to prevent CPU overload
await asyncio.sleep(0.1)
@@ -300,7 +318,8 @@ class EnhancedVideoQueueManager:
try:
if self.persistence_path and (
not self._last_backup
or (datetime.utcnow() - self._last_backup).total_seconds() >= self.backup_interval
or (datetime.utcnow() - self._last_backup).total_seconds()
>= self.backup_interval
):
await self._persist_queue()
self._last_backup = datetime.utcnow()
@@ -313,7 +332,7 @@ class EnhancedVideoQueueManager:
"""Persist queue state to disk with improved error handling"""
if not self.persistence_path:
return
try:
state = {
"queue": [asdict(item) for item in self._queue],
@@ -327,23 +346,27 @@ class EnhancedVideoQueueManager:
"success_rate": self.metrics.success_rate,
"errors_by_type": self.metrics.errors_by_type,
"last_error": self.metrics.last_error,
"last_error_time": self.metrics.last_error_time.isoformat() if self.metrics.last_error_time else None
}
"last_error_time": (
self.metrics.last_error_time.isoformat()
if self.metrics.last_error_time
else None
),
},
}
# Ensure directory exists
os.makedirs(os.path.dirname(self.persistence_path), exist_ok=True)
# Write to temp file first
temp_path = f"{self.persistence_path}.tmp"
async with aiofiles.open(temp_path, 'w') as f:
await f.write(json.dumps(state, default=str))
await f.flush()
with open(temp_path, "w") as f:
json.dump(state, f, default=str)
f.flush()
os.fsync(f.fileno())
# Atomic rename
await aiofiles.os.rename(temp_path, self.persistence_path)
os.rename(temp_path, self.persistence_path)
except Exception as e:
logger.error(f"Error persisting queue state: {traceback.format_exc()}")
raise QueueError(f"Failed to persist queue state: {str(e)}")
@@ -352,11 +375,11 @@ class EnhancedVideoQueueManager:
"""Load persisted queue state from disk with improved error handling"""
if not self.persistence_path or not os.path.exists(self.persistence_path):
return
try:
with open(self.persistence_path, 'r') as f:
with open(self.persistence_path, "r") as f:
state = json.load(f)
# Restore queue items with datetime conversion
self._queue = []
for item in state["queue"]:
@@ -364,11 +387,13 @@ class EnhancedVideoQueueManager:
if item.get("last_retry"):
item["last_retry"] = datetime.fromisoformat(item["last_retry"])
self._queue.append(QueueItem(**item))
self._processing = {k: QueueItem(**v) for k, v in state["processing"].items()}
self._processing = {
k: QueueItem(**v) for k, v in state["processing"].items()
}
self._completed = {k: QueueItem(**v) for k, v in state["completed"].items()}
self._failed = {k: QueueItem(**v) for k, v in state["failed"].items()}
# Restore metrics
self.metrics.total_processed = state["metrics"]["total_processed"]
self.metrics.total_failed = state["metrics"]["total_failed"]
@@ -377,20 +402,28 @@ class EnhancedVideoQueueManager:
self.metrics.errors_by_type = state["metrics"]["errors_by_type"]
self.metrics.last_error = state["metrics"]["last_error"]
if state["metrics"]["last_error_time"]:
self.metrics.last_error_time = datetime.fromisoformat(state["metrics"]["last_error_time"])
self.metrics.last_error_time = datetime.fromisoformat(
state["metrics"]["last_error_time"]
)
logger.info("Successfully loaded persisted queue state")
except Exception as e:
logger.error(f"Error loading persisted queue state: {traceback.format_exc()}")
logger.error(
f"Error loading persisted queue state: {traceback.format_exc()}"
)
# Create backup of corrupted state file
if os.path.exists(self.persistence_path):
backup_path = f"{self.persistence_path}.bak.{int(time.time())}"
try:
os.rename(self.persistence_path, backup_path)
logger.info(f"Created backup of corrupted state file: {backup_path}")
logger.info(
f"Created backup of corrupted state file: {backup_path}"
)
except Exception as be:
logger.error(f"Failed to create backup of corrupted state file: {str(be)}")
logger.error(
f"Failed to create backup of corrupted state file: {str(be)}"
)
async def _monitor_health(self):
"""Monitor queue health and performance with improved metrics"""
@@ -399,32 +432,35 @@ class EnhancedVideoQueueManager:
# Check memory usage
process = psutil.Process()
memory_usage = process.memory_info().rss / 1024 / 1024 # MB
if memory_usage > 1024: # 1GB
logger.warning(f"High memory usage detected: {memory_usage:.2f}MB")
# Force garbage collection
import gc
gc.collect()
# Check for potential deadlocks
processing_times = [
time.time() - item.processing_time
for item in self._processing.values()
if item.processing_time > 0
]
if processing_times:
max_time = max(processing_times)
if max_time > 3600: # 1 hour
logger.warning(f"Potential deadlock detected: Item processing for {max_time:.2f}s")
logger.warning(
f"Potential deadlock detected: Item processing for {max_time:.2f}s"
)
# Attempt recovery
await self._recover_stuck_items()
# Calculate and log detailed metrics
success_rate = self.metrics.success_rate
error_distribution = self.metrics.errors_by_type
avg_processing_time = self.metrics.avg_processing_time
logger.info(
f"Queue Health Metrics:\n"
f"- Success Rate: {success_rate:.2%}\n"
@@ -434,9 +470,9 @@ class EnhancedVideoQueueManager:
f"- Queue Size: {len(self._queue)}\n"
f"- Processing Items: {len(self._processing)}"
)
await asyncio.sleep(300) # Check every 5 minutes
except Exception as e:
logger.error(f"Error in health monitor: {traceback.format_exc()}")
await asyncio.sleep(60)
@@ -447,7 +483,10 @@ class EnhancedVideoQueueManager:
async with self._processing_lock:
current_time = time.time()
for url, item in list(self._processing.items()):
if item.processing_time > 0 and (current_time - item.processing_time) > 3600:
if (
item.processing_time > 0
and (current_time - item.processing_time) > 3600
):
# Move to failed queue if max retries reached
if item.retry_count >= self.max_retries:
self._failed[url] = item
@@ -462,7 +501,7 @@ class EnhancedVideoQueueManager:
self._queue.append(item)
self._processing.pop(url)
logger.info(f"Recovered stuck item for retry: {url}")
except Exception as e:
logger.error(f"Error recovering stuck items: {str(e)}")
@@ -473,13 +512,13 @@ class EnhancedVideoQueueManager:
for task in self._active_tasks:
if not task.done():
task.cancel()
await asyncio.gather(*self._active_tasks, return_exceptions=True)
# Persist final state
if self.persistence_path:
await self._persist_queue()
# Clear all collections
self._queue.clear()
self._processing.clear()
@@ -487,9 +526,9 @@ class EnhancedVideoQueueManager:
self._failed.clear()
self._guild_queues.clear()
self._channel_queues.clear()
logger.info("Queue manager cleanup completed")
except Exception as e:
logger.error(f"Error during cleanup: {str(e)}")
raise CleanupError(f"Failed to clean up queue manager: {str(e)}")
@@ -501,36 +540,44 @@ class EnhancedVideoQueueManager:
guild_urls = self._guild_queues.get(guild_id, set())
status = {
"pending": sum(1 for item in self._queue if item.url in guild_urls),
"processing": sum(1 for url in self._processing if url in guild_urls),
"processing": sum(
1 for url in self._processing if url in guild_urls
),
"completed": sum(1 for url in self._completed if url in guild_urls),
"failed": sum(1 for url in self._failed if url in guild_urls)
"failed": sum(1 for url in self._failed if url in guild_urls),
}
else:
status = {
"pending": len(self._queue),
"processing": len(self._processing),
"completed": len(self._completed),
"failed": len(self._failed)
"failed": len(self._failed),
}
# Add detailed metrics
status.update({
"metrics": {
"total_processed": self.metrics.total_processed,
"total_failed": self.metrics.total_failed,
"success_rate": self.metrics.success_rate,
"avg_processing_time": self.metrics.avg_processing_time,
"peak_memory_usage": self.metrics.peak_memory_usage,
"last_cleanup": self.metrics.last_cleanup.isoformat(),
"errors_by_type": self.metrics.errors_by_type,
"last_error": self.metrics.last_error,
"last_error_time": self.metrics.last_error_time.isoformat() if self.metrics.last_error_time else None,
"retries": self.metrics.retries
status.update(
{
"metrics": {
"total_processed": self.metrics.total_processed,
"total_failed": self.metrics.total_failed,
"success_rate": self.metrics.success_rate,
"avg_processing_time": self.metrics.avg_processing_time,
"peak_memory_usage": self.metrics.peak_memory_usage,
"last_cleanup": self.metrics.last_cleanup.isoformat(),
"errors_by_type": self.metrics.errors_by_type,
"last_error": self.metrics.last_error,
"last_error_time": (
self.metrics.last_error_time.isoformat()
if self.metrics.last_error_time
else None
),
"retries": self.metrics.retries,
}
}
})
)
return status
except Exception as e:
logger.error(f"Error getting queue status: {str(e)}")
raise QueueError(f"Failed to get queue status: {str(e)}")
@@ -541,38 +588,40 @@ class EnhancedVideoQueueManager:
try:
current_time = datetime.utcnow()
cleanup_cutoff = current_time - timedelta(seconds=self.max_history_age)
async with self._queue_lock:
# Clean up completed items
for url in list(self._completed.keys()):
item = self._completed[url]
if item.added_at < cleanup_cutoff:
self._completed.pop(url)
# Clean up failed items
for url in list(self._failed.keys()):
item = self._failed[url]
if item.added_at < cleanup_cutoff:
self._failed.pop(url)
# Clean up guild and channel tracking
for guild_id in list(self._guild_queues.keys()):
self._guild_queues[guild_id] = {
url for url in self._guild_queues[guild_id]
url
for url in self._guild_queues[guild_id]
if url in self._queue or url in self._processing
}
for channel_id in list(self._channel_queues.keys()):
self._channel_queues[channel_id] = {
url for url in self._channel_queues[channel_id]
url
for url in self._channel_queues[channel_id]
if url in self._queue or url in self._processing
}
self.metrics.last_cleanup = current_time
logger.info("Completed periodic queue cleanup")
await asyncio.sleep(self.cleanup_interval)
except Exception as e:
logger.error(f"Error in periodic cleanup: {traceback.format_exc()}")
await asyncio.sleep(60)