mirror of
https://github.com/pacnpal/Pac-cogs.git
synced 2025-12-20 02:41:06 -05:00
Added missing discord import Added proper error handling for all discord operations Improved error reporting for discord-specific failures Enhanced Error Handling: Added try/except blocks around all major operations Implemented proper cleanup in finally blocks Added more specific error messages for debugging Queue Processing Improvements: Ensured the queue continues processing even if individual items fail Added better file cleanup to prevent resource leaks Improved error reporting to help diagnose issues Resource Management: Added proper cleanup of downloaded files Improved handling of missing discord resources Better management of failed downloads
737 lines
30 KiB
Python
737 lines
30 KiB
Python
"""Enhanced queue system for VideoArchiver with improved memory management and performance"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import json
|
|
import os
|
|
import time
|
|
import psutil
|
|
from typing import Dict, Optional, Set, Tuple, Callable, Any, List, Union
|
|
from datetime import datetime, timedelta
|
|
import traceback
|
|
from dataclasses import dataclass, asdict, field
|
|
from pathlib import Path
|
|
import sys
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from functools import partial
|
|
import tempfile
|
|
import shutil
|
|
from .exceptions import (
|
|
QueueError,
|
|
ResourceExhaustedError,
|
|
ProcessingError,
|
|
CleanupError,
|
|
FileOperationError,
|
|
)
|
|
|
|
# Configure logging with proper format
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger("EnhancedQueueManager")
|
|
|
|
|
|
@dataclass
|
|
class QueueMetrics:
|
|
"""Metrics tracking for queue performance and health"""
|
|
|
|
total_processed: int = 0
|
|
total_failed: int = 0
|
|
avg_processing_time: float = 0.0
|
|
success_rate: float = 0.0
|
|
errors_by_type: Dict[str, int] = field(default_factory=dict)
|
|
last_error: Optional[str] = None
|
|
last_error_time: Optional[datetime] = None
|
|
last_cleanup: datetime = field(default_factory=datetime.utcnow)
|
|
retries: int = 0
|
|
peak_memory_usage: float = 0.0
|
|
processing_times: List[float] = field(default_factory=list)
|
|
compression_failures: int = 0
|
|
hardware_accel_failures: int = 0
|
|
|
|
def update(self, processing_time: float, success: bool, error: str = None):
|
|
"""Update metrics with new processing information"""
|
|
self.total_processed += 1
|
|
if not success:
|
|
self.total_failed += 1
|
|
if error:
|
|
self.last_error = error
|
|
self.last_error_time = datetime.utcnow()
|
|
error_type = error.split(":")[0] if ":" in error else error
|
|
self.errors_by_type[error_type] = (
|
|
self.errors_by_type.get(error_type, 0) + 1
|
|
)
|
|
|
|
# Track specific error types
|
|
if "compression error" in error.lower():
|
|
self.compression_failures += 1
|
|
elif "hardware acceleration failed" in error.lower():
|
|
self.hardware_accel_failures += 1
|
|
|
|
# Update processing times with sliding window
|
|
self.processing_times.append(processing_time)
|
|
if len(self.processing_times) > 100: # Keep last 100 processing times
|
|
self.processing_times.pop(0)
|
|
|
|
# Update average processing time
|
|
self.avg_processing_time = (
|
|
sum(self.processing_times) / len(self.processing_times)
|
|
if self.processing_times
|
|
else 0.0
|
|
)
|
|
|
|
# Update success rate
|
|
self.success_rate = (
|
|
(self.total_processed - self.total_failed) / self.total_processed
|
|
if self.total_processed > 0
|
|
else 0.0
|
|
)
|
|
|
|
# Update peak memory usage
|
|
current_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
|
|
self.peak_memory_usage = max(self.peak_memory_usage, current_memory)
|
|
|
|
|
|
@dataclass
|
|
class QueueItem:
|
|
"""Represents a video processing task in the queue"""
|
|
|
|
url: str
|
|
message_id: int
|
|
channel_id: int
|
|
guild_id: int
|
|
author_id: int
|
|
added_at: datetime
|
|
priority: int = 0 # Higher number = higher priority
|
|
status: str = "pending" # pending, processing, completed, failed
|
|
error: Optional[str] = None
|
|
attempt: int = 0
|
|
processing_time: float = 0.0
|
|
size_bytes: int = 0
|
|
last_error: Optional[str] = None
|
|
retry_count: int = 0
|
|
last_retry: Optional[datetime] = None
|
|
processing_times: List[float] = field(default_factory=list)
|
|
last_error_time: Optional[datetime] = None
|
|
hardware_accel_attempted: bool = False
|
|
compression_attempted: bool = False
|
|
|
|
|
|
class EnhancedVideoQueueManager:
|
|
"""Enhanced queue manager with improved memory management and performance"""
|
|
|
|
def __init__(
|
|
self,
|
|
max_retries: int = 3,
|
|
retry_delay: int = 5,
|
|
max_queue_size: int = 1000,
|
|
cleanup_interval: int = 3600, # 1 hour
|
|
max_history_age: int = 86400, # 24 hours
|
|
persistence_path: Optional[str] = None,
|
|
backup_interval: int = 300, # 5 minutes
|
|
deadlock_threshold: int = 900, # 15 minutes (reduced from 1 hour)
|
|
):
|
|
self.max_retries = max_retries
|
|
self.retry_delay = retry_delay
|
|
self.max_queue_size = max_queue_size
|
|
self.cleanup_interval = cleanup_interval
|
|
self.max_history_age = max_history_age
|
|
self.persistence_path = persistence_path
|
|
self.backup_interval = backup_interval
|
|
self.deadlock_threshold = deadlock_threshold
|
|
|
|
# Queue storage with priority
|
|
self._queue: List[QueueItem] = []
|
|
self._queue_lock = asyncio.Lock()
|
|
self._processing: Dict[str, QueueItem] = {}
|
|
self._completed: Dict[str, QueueItem] = {}
|
|
self._failed: Dict[str, QueueItem] = {}
|
|
|
|
# Track active tasks
|
|
self._active_tasks: Set[asyncio.Task] = set()
|
|
self._processing_lock = asyncio.Lock()
|
|
|
|
# Status tracking
|
|
self._guild_queues: Dict[int, Set[str]] = {}
|
|
self._channel_queues: Dict[int, Set[str]] = {}
|
|
|
|
# Metrics tracking
|
|
self.metrics = QueueMetrics()
|
|
|
|
# Recovery tracking
|
|
self._recovery_attempts: Dict[str, int] = {}
|
|
self._last_backup: Optional[datetime] = None
|
|
|
|
# Initialize tasks
|
|
self._init_tasks()
|
|
|
|
def _init_tasks(self):
|
|
"""Initialize background tasks"""
|
|
# Cleanup and monitoring
|
|
self._cleanup_task = asyncio.create_task(self._periodic_cleanup())
|
|
self._active_tasks.add(self._cleanup_task)
|
|
|
|
# Health monitoring
|
|
self._health_check_task = asyncio.create_task(self._monitor_health())
|
|
self._active_tasks.add(self._health_check_task)
|
|
|
|
# Backup task
|
|
if self.persistence_path:
|
|
self._backup_task = asyncio.create_task(self._periodic_backup())
|
|
self._active_tasks.add(self._backup_task)
|
|
|
|
# Load persisted queue
|
|
self._load_persisted_queue()
|
|
|
|
async def add_to_queue(
|
|
self,
|
|
url: str,
|
|
message_id: int,
|
|
channel_id: int,
|
|
guild_id: int,
|
|
author_id: int,
|
|
callback: Optional[Callable[[str, bool, str], Any]] = None, # Make callback optional
|
|
priority: int = 0,
|
|
) -> bool:
|
|
"""Add a video to the processing queue with priority support"""
|
|
try:
|
|
async with self._queue_lock:
|
|
if len(self._queue) >= self.max_queue_size:
|
|
raise QueueError("Queue is full")
|
|
|
|
# Check system resources
|
|
if psutil.virtual_memory().percent > 90:
|
|
raise ResourceExhaustedError("System memory is critically low")
|
|
|
|
# Create queue item
|
|
item = QueueItem(
|
|
url=url,
|
|
message_id=message_id,
|
|
channel_id=channel_id,
|
|
guild_id=guild_id,
|
|
author_id=author_id,
|
|
added_at=datetime.utcnow(),
|
|
priority=priority,
|
|
)
|
|
|
|
# Add to tracking collections
|
|
if guild_id not in self._guild_queues:
|
|
self._guild_queues[guild_id] = set()
|
|
self._guild_queues[guild_id].add(url)
|
|
|
|
if channel_id not in self._channel_queues:
|
|
self._channel_queues[channel_id] = set()
|
|
self._channel_queues[channel_id].add(url)
|
|
|
|
# Add to queue with priority
|
|
self._queue.append(item)
|
|
self._queue.sort(key=lambda x: (-x.priority, x.added_at))
|
|
|
|
# Persist queue state
|
|
if self.persistence_path:
|
|
await self._persist_queue()
|
|
|
|
logger.info(f"Added video to queue: {url} with priority {priority}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding video to queue: {traceback.format_exc()}")
|
|
raise QueueError(f"Failed to add to queue: {str(e)}")
|
|
|
|
async def process_queue(
|
|
self, processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]
|
|
):
|
|
"""Process items in the queue with the provided processor function
|
|
|
|
Args:
|
|
processor: A callable that takes a QueueItem and returns a tuple of (success: bool, error: Optional[str])
|
|
"""
|
|
logger.info("Queue processor started and waiting for items...")
|
|
while True:
|
|
try:
|
|
# Get next item from queue
|
|
item = None
|
|
async with self._queue_lock:
|
|
if self._queue:
|
|
item = self._queue.pop(0)
|
|
self._processing[item.url] = item
|
|
item.status = "processing"
|
|
item.processing_time = time.time()
|
|
logger.info(f"Processing queue item: {item.url}")
|
|
|
|
if not item:
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
try:
|
|
# Process the item
|
|
start_time = time.time()
|
|
logger.info(f"Calling processor for item: {item.url}")
|
|
success, error = await processor(item)
|
|
logger.info(f"Processor result for {item.url}: success={success}, error={error}")
|
|
processing_time = time.time() - start_time
|
|
|
|
# Update metrics
|
|
self.metrics.update(processing_time, success, error)
|
|
|
|
# Update item status
|
|
async with self._processing_lock:
|
|
if success:
|
|
item.status = "completed"
|
|
self._completed[item.url] = item
|
|
logger.info(f"Successfully processed item: {item.url}")
|
|
else:
|
|
item.status = "failed"
|
|
item.error = error
|
|
item.last_error = error
|
|
item.last_error_time = datetime.utcnow()
|
|
|
|
# Handle retries with improved logic
|
|
if item.retry_count < self.max_retries:
|
|
item.retry_count += 1
|
|
item.status = "pending"
|
|
item.last_retry = datetime.utcnow()
|
|
|
|
# Adjust processing strategy based on error type
|
|
if "hardware acceleration failed" in str(error).lower():
|
|
item.hardware_accel_attempted = True
|
|
elif "compression error" in str(error).lower():
|
|
item.compression_attempted = True
|
|
|
|
# Add back to queue with adjusted priority
|
|
item.priority = max(0, item.priority - 1) # Lower priority for retries
|
|
self._queue.append(item)
|
|
logger.warning(
|
|
f"Retrying item: {item.url} (attempt {item.retry_count})"
|
|
)
|
|
else:
|
|
self._failed[item.url] = item
|
|
logger.error(
|
|
f"Failed to process item after {self.max_retries} attempts: {item.url}"
|
|
)
|
|
|
|
# Always remove from processing, regardless of outcome
|
|
self._processing.pop(item.url, None)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error processing item {item.url}: {traceback.format_exc()}"
|
|
)
|
|
# Ensure item is properly handled even on unexpected errors
|
|
async with self._processing_lock:
|
|
item.status = "failed"
|
|
item.error = str(e)
|
|
item.last_error = str(e)
|
|
item.last_error_time = datetime.utcnow()
|
|
self._failed[item.url] = item
|
|
# Always remove from processing
|
|
self._processing.pop(item.url, None)
|
|
|
|
# Persist state after processing
|
|
if self.persistence_path:
|
|
try:
|
|
await self._persist_queue()
|
|
except Exception as e:
|
|
logger.error(f"Failed to persist queue state: {e}")
|
|
# Continue processing even if persistence fails
|
|
|
|
except Exception as e:
|
|
logger.error(f"Critical error in queue processor: {traceback.format_exc()}")
|
|
# Ensure we don't get stuck in a tight loop on critical errors
|
|
await asyncio.sleep(1)
|
|
continue # Continue to next iteration to process remaining items
|
|
|
|
# Small delay to prevent CPU overload
|
|
await asyncio.sleep(0.1)
|
|
|
|
async def _periodic_backup(self):
|
|
"""Periodically backup queue state"""
|
|
while True:
|
|
try:
|
|
if self.persistence_path and (
|
|
not self._last_backup
|
|
or (datetime.utcnow() - self._last_backup).total_seconds()
|
|
>= self.backup_interval
|
|
):
|
|
await self._persist_queue()
|
|
self._last_backup = datetime.utcnow()
|
|
await asyncio.sleep(self.backup_interval)
|
|
except Exception as e:
|
|
logger.error(f"Error in periodic backup: {str(e)}")
|
|
await asyncio.sleep(60)
|
|
|
|
async def _persist_queue(self):
|
|
"""Persist queue state to disk with improved error handling"""
|
|
if not self.persistence_path:
|
|
return
|
|
|
|
try:
|
|
state = {
|
|
"queue": [asdict(item) for item in self._queue],
|
|
"processing": {k: asdict(v) for k, v in self._processing.items()},
|
|
"completed": {k: asdict(v) for k, v in self._completed.items()},
|
|
"failed": {k: asdict(v) for k, v in self._failed.items()},
|
|
"metrics": {
|
|
"total_processed": self.metrics.total_processed,
|
|
"total_failed": self.metrics.total_failed,
|
|
"avg_processing_time": self.metrics.avg_processing_time,
|
|
"success_rate": self.metrics.success_rate,
|
|
"errors_by_type": self.metrics.errors_by_type,
|
|
"last_error": self.metrics.last_error,
|
|
"last_error_time": (
|
|
self.metrics.last_error_time.isoformat()
|
|
if self.metrics.last_error_time
|
|
else None
|
|
),
|
|
"compression_failures": self.metrics.compression_failures,
|
|
"hardware_accel_failures": self.metrics.hardware_accel_failures,
|
|
},
|
|
}
|
|
|
|
# Ensure directory exists
|
|
os.makedirs(os.path.dirname(self.persistence_path), exist_ok=True)
|
|
|
|
# Write to temp file first
|
|
temp_path = f"{self.persistence_path}.tmp"
|
|
with open(temp_path, "w") as f:
|
|
json.dump(state, f, default=str)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
|
|
# Atomic rename
|
|
os.rename(temp_path, self.persistence_path)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error persisting queue state: {traceback.format_exc()}")
|
|
raise QueueError(f"Failed to persist queue state: {str(e)}")
|
|
|
|
def _load_persisted_queue(self):
|
|
"""Load persisted queue state from disk with improved error handling"""
|
|
if not self.persistence_path or not os.path.exists(self.persistence_path):
|
|
return
|
|
|
|
try:
|
|
with open(self.persistence_path, "r") as f:
|
|
state = json.load(f)
|
|
|
|
# Restore queue items with datetime conversion
|
|
self._queue = []
|
|
for item in state["queue"]:
|
|
item["added_at"] = datetime.fromisoformat(item["added_at"])
|
|
if item.get("last_retry"):
|
|
item["last_retry"] = datetime.fromisoformat(item["last_retry"])
|
|
if item.get("last_error_time"):
|
|
item["last_error_time"] = datetime.fromisoformat(item["last_error_time"])
|
|
self._queue.append(QueueItem(**item))
|
|
|
|
self._processing = {
|
|
k: QueueItem(**v) for k, v in state["processing"].items()
|
|
}
|
|
self._completed = {k: QueueItem(**v) for k, v in state["completed"].items()}
|
|
self._failed = {k: QueueItem(**v) for k, v in state["failed"].items()}
|
|
|
|
# Restore metrics
|
|
metrics_data = state["metrics"]
|
|
self.metrics.total_processed = metrics_data["total_processed"]
|
|
self.metrics.total_failed = metrics_data["total_failed"]
|
|
self.metrics.avg_processing_time = metrics_data["avg_processing_time"]
|
|
self.metrics.success_rate = metrics_data["success_rate"]
|
|
self.metrics.errors_by_type = metrics_data["errors_by_type"]
|
|
self.metrics.last_error = metrics_data["last_error"]
|
|
self.metrics.compression_failures = metrics_data.get("compression_failures", 0)
|
|
self.metrics.hardware_accel_failures = metrics_data.get("hardware_accel_failures", 0)
|
|
|
|
if metrics_data["last_error_time"]:
|
|
self.metrics.last_error_time = datetime.fromisoformat(
|
|
metrics_data["last_error_time"]
|
|
)
|
|
|
|
logger.info("Successfully loaded persisted queue state")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error loading persisted queue state: {traceback.format_exc()}"
|
|
)
|
|
# Create backup of corrupted state file
|
|
if os.path.exists(self.persistence_path):
|
|
backup_path = f"{self.persistence_path}.bak.{int(time.time())}"
|
|
try:
|
|
os.rename(self.persistence_path, backup_path)
|
|
logger.info(
|
|
f"Created backup of corrupted state file: {backup_path}"
|
|
)
|
|
except Exception as be:
|
|
logger.error(
|
|
f"Failed to create backup of corrupted state file: {str(be)}"
|
|
)
|
|
|
|
async def _monitor_health(self):
|
|
"""Monitor queue health and performance with improved metrics"""
|
|
while True:
|
|
try:
|
|
# Check memory usage
|
|
process = psutil.Process()
|
|
memory_usage = process.memory_info().rss / 1024 / 1024 # MB
|
|
|
|
if memory_usage > 1024: # 1GB
|
|
logger.warning(f"High memory usage detected: {memory_usage:.2f}MB")
|
|
# Force garbage collection
|
|
import gc
|
|
gc.collect()
|
|
|
|
# Check for potential deadlocks with reduced threshold
|
|
processing_times = [
|
|
time.time() - item.processing_time
|
|
for item in self._processing.values()
|
|
if item.processing_time > 0
|
|
]
|
|
|
|
if processing_times:
|
|
max_time = max(processing_times)
|
|
if max_time > self.deadlock_threshold: # Reduced from 3600s to 900s
|
|
logger.warning(
|
|
f"Potential deadlock detected: Item processing for {max_time:.2f}s"
|
|
)
|
|
# Attempt recovery
|
|
await self._recover_stuck_items()
|
|
|
|
# Calculate and log detailed metrics
|
|
success_rate = self.metrics.success_rate
|
|
error_distribution = self.metrics.errors_by_type
|
|
avg_processing_time = self.metrics.avg_processing_time
|
|
|
|
logger.info(
|
|
f"Queue Health Metrics:\n"
|
|
f"- Success Rate: {success_rate:.2%}\n"
|
|
f"- Avg Processing Time: {avg_processing_time:.2f}s\n"
|
|
f"- Memory Usage: {memory_usage:.2f}MB\n"
|
|
f"- Error Distribution: {error_distribution}\n"
|
|
f"- Queue Size: {len(self._queue)}\n"
|
|
f"- Processing Items: {len(self._processing)}"
|
|
)
|
|
|
|
await asyncio.sleep(300) # Check every 5 minutes
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in health monitor: {traceback.format_exc()}")
|
|
await asyncio.sleep(60)
|
|
|
|
async def _recover_stuck_items(self):
|
|
"""Attempt to recover stuck items in the processing queue"""
|
|
try:
|
|
async with self._processing_lock:
|
|
current_time = time.time()
|
|
for url, item in list(self._processing.items()):
|
|
if (
|
|
item.processing_time > 0
|
|
and (current_time - item.processing_time) > self.deadlock_threshold
|
|
):
|
|
# Move to failed queue if max retries reached
|
|
if item.retry_count >= self.max_retries:
|
|
self._failed[url] = item
|
|
self._processing.pop(url)
|
|
logger.warning(f"Moved stuck item to failed queue: {url}")
|
|
else:
|
|
# Increment retry count and reset for reprocessing
|
|
item.retry_count += 1
|
|
item.processing_time = 0
|
|
item.last_retry = datetime.utcnow()
|
|
item.status = "pending"
|
|
# Lower priority for stuck items
|
|
item.priority = max(0, item.priority - 2)
|
|
self._queue.append(item)
|
|
self._processing.pop(url)
|
|
logger.info(f"Recovered stuck item for retry: {url}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error recovering stuck items: {str(e)}")
|
|
|
|
async def cleanup(self):
|
|
"""Clean up resources and stop queue processing"""
|
|
try:
|
|
# Cancel all monitoring tasks
|
|
for task in self._active_tasks:
|
|
if not task.done():
|
|
task.cancel()
|
|
|
|
await asyncio.gather(*self._active_tasks, return_exceptions=True)
|
|
|
|
# Persist final state
|
|
if self.persistence_path:
|
|
await self._persist_queue()
|
|
|
|
# Clear all collections
|
|
self._queue.clear()
|
|
self._processing.clear()
|
|
self._completed.clear()
|
|
self._failed.clear()
|
|
self._guild_queues.clear()
|
|
self._channel_queues.clear()
|
|
|
|
logger.info("Queue manager cleanup completed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during cleanup: {str(e)}")
|
|
raise CleanupError(f"Failed to clean up queue manager: {str(e)}")
|
|
|
|
def get_queue_status(self, guild_id: int) -> dict:
|
|
"""Get current queue status and metrics for a guild
|
|
|
|
Args:
|
|
guild_id: The ID of the guild to get status for
|
|
|
|
Returns:
|
|
dict: Queue status including counts and metrics
|
|
"""
|
|
try:
|
|
# Count items for this guild
|
|
pending = len([item for item in self._queue if item.guild_id == guild_id])
|
|
processing = len([item for item in self._processing.values() if item.guild_id == guild_id])
|
|
completed = len([item for item in self._completed.values() if item.guild_id == guild_id])
|
|
failed = len([item for item in self._failed.values() if item.guild_id == guild_id])
|
|
|
|
# Get metrics
|
|
metrics = {
|
|
"total_processed": self.metrics.total_processed,
|
|
"total_failed": self.metrics.total_failed,
|
|
"success_rate": self.metrics.success_rate,
|
|
"avg_processing_time": self.metrics.avg_processing_time,
|
|
"peak_memory_usage": self.metrics.peak_memory_usage,
|
|
"last_cleanup": self.metrics.last_cleanup.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"errors_by_type": self.metrics.errors_by_type,
|
|
"compression_failures": self.metrics.compression_failures,
|
|
"hardware_accel_failures": self.metrics.hardware_accel_failures
|
|
}
|
|
|
|
return {
|
|
"pending": pending,
|
|
"processing": processing,
|
|
"completed": completed,
|
|
"failed": failed,
|
|
"metrics": metrics
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting queue status: {str(e)}")
|
|
# Return empty status on error
|
|
return {
|
|
"pending": 0,
|
|
"processing": 0,
|
|
"completed": 0,
|
|
"failed": 0,
|
|
"metrics": {
|
|
"total_processed": 0,
|
|
"total_failed": 0,
|
|
"success_rate": 0.0,
|
|
"avg_processing_time": 0.0,
|
|
"peak_memory_usage": 0.0,
|
|
"last_cleanup": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"errors_by_type": {},
|
|
"compression_failures": 0,
|
|
"hardware_accel_failures": 0
|
|
}
|
|
}
|
|
|
|
async def clear_guild_queue(self, guild_id: int) -> int:
|
|
"""Clear all queue items for a specific guild
|
|
|
|
Args:
|
|
guild_id: The ID of the guild to clear items for
|
|
|
|
Returns:
|
|
int: Number of items cleared
|
|
"""
|
|
try:
|
|
cleared_count = 0
|
|
async with self._queue_lock:
|
|
# Get URLs for this guild
|
|
guild_urls = self._guild_queues.get(guild_id, set())
|
|
|
|
# Clear from pending queue
|
|
self._queue = [item for item in self._queue if item.guild_id != guild_id]
|
|
|
|
# Clear from processing
|
|
for url in list(self._processing.keys()):
|
|
if self._processing[url].guild_id == guild_id:
|
|
self._processing.pop(url)
|
|
cleared_count += 1
|
|
|
|
# Clear from completed
|
|
for url in list(self._completed.keys()):
|
|
if self._completed[url].guild_id == guild_id:
|
|
self._completed.pop(url)
|
|
cleared_count += 1
|
|
|
|
# Clear from failed
|
|
for url in list(self._failed.keys()):
|
|
if self._failed[url].guild_id == guild_id:
|
|
self._failed.pop(url)
|
|
cleared_count += 1
|
|
|
|
# Clear guild tracking
|
|
if guild_id in self._guild_queues:
|
|
cleared_count += len(self._guild_queues[guild_id])
|
|
self._guild_queues[guild_id].clear()
|
|
|
|
# Clear channel tracking for this guild's channels
|
|
for channel_id in list(self._channel_queues.keys()):
|
|
self._channel_queues[channel_id] = {
|
|
url for url in self._channel_queues[channel_id]
|
|
if url not in guild_urls
|
|
}
|
|
|
|
# Persist updated state
|
|
if self.persistence_path:
|
|
await self._persist_queue()
|
|
|
|
logger.info(f"Cleared {cleared_count} items from guild {guild_id} queue")
|
|
return cleared_count
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error clearing guild queue: {traceback.format_exc()}")
|
|
raise QueueError(f"Failed to clear guild queue: {str(e)}")
|
|
|
|
async def _periodic_cleanup(self):
|
|
"""Periodically clean up old completed/failed items"""
|
|
while True:
|
|
try:
|
|
current_time = datetime.utcnow()
|
|
cleanup_cutoff = current_time - timedelta(seconds=self.max_history_age)
|
|
|
|
async with self._queue_lock:
|
|
# Clean up completed items
|
|
for url in list(self._completed.keys()):
|
|
item = self._completed[url]
|
|
if item.added_at < cleanup_cutoff:
|
|
self._completed.pop(url)
|
|
|
|
# Clean up failed items
|
|
for url in list(self._failed.keys()):
|
|
item = self._failed[url]
|
|
if item.added_at < cleanup_cutoff:
|
|
self._failed.pop(url)
|
|
|
|
# Clean up guild and channel tracking
|
|
for guild_id in list(self._guild_queues.keys()):
|
|
self._guild_queues[guild_id] = {
|
|
url
|
|
for url in self._guild_queues[guild_id]
|
|
if url in self._queue or url in self._processing
|
|
}
|
|
|
|
for channel_id in list(self._channel_queues.keys()):
|
|
self._channel_queues[channel_id] = {
|
|
url
|
|
for url in self._channel_queues[channel_id]
|
|
if url in self._queue or url in self._processing
|
|
}
|
|
|
|
self.metrics.last_cleanup = current_time
|
|
logger.info("Completed periodic queue cleanup")
|
|
|
|
await asyncio.sleep(self.cleanup_interval)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in periodic cleanup: {traceback.format_exc()}")
|
|
await asyncio.sleep(60)
|