📥 Pending: Shows items waiting to be processed

⚙️ Processing: Shows items currently being processed
 Completed: Shows successfully processed items
 Failed: Shows failed items
This commit is contained in:
pacnpal
2024-11-15 14:12:43 +00:00
parent 0d7b1d449f
commit f8d383a55a
3 changed files with 101 additions and 62 deletions

View File

@@ -115,6 +115,7 @@ class QueueItem:
last_error_time: Optional[datetime] = None
hardware_accel_attempted: bool = False
compression_attempted: bool = False
original_message: Optional[Any] = None # Store the original message reference
class EnhancedVideoQueueManager:
@@ -190,7 +191,9 @@ class EnhancedVideoQueueManager:
channel_id: int,
guild_id: int,
author_id: int,
callback: Optional[Callable[[str, bool, str], Any]] = None, # Make callback optional
callback: Optional[
Callable[[str, bool, str], Any]
] = None, # Make callback optional
priority: int = 0,
) -> bool:
"""Add a video to the processing queue with priority support"""
@@ -268,7 +271,9 @@ class EnhancedVideoQueueManager:
start_time = time.time()
logger.info(f"Calling processor for item: {item.url}")
success, error = await processor(item)
logger.info(f"Processor result for {item.url}: success={success}, error={error}")
logger.info(
f"Processor result for {item.url}: success={success}, error={error}"
)
processing_time = time.time() - start_time
# Update metrics
@@ -299,7 +304,9 @@ class EnhancedVideoQueueManager:
item.compression_attempted = True
# Add back to queue with adjusted priority
item.priority = max(0, item.priority - 1) # Lower priority for retries
item.priority = max(
0, item.priority - 1
) # Lower priority for retries
self._queue.append(item)
logger.warning(
f"Retrying item: {item.url} (attempt {item.retry_count})"
@@ -336,7 +343,9 @@ class EnhancedVideoQueueManager:
# Continue processing even if persistence fails
except Exception as e:
logger.error(f"Critical error in queue processor: {traceback.format_exc()}")
logger.error(
f"Critical error in queue processor: {traceback.format_exc()}"
)
# Ensure we don't get stuck in a tight loop on critical errors
await asyncio.sleep(1)
continue # Continue to next iteration to process remaining items
@@ -421,7 +430,9 @@ class EnhancedVideoQueueManager:
if item.get("last_retry"):
item["last_retry"] = datetime.fromisoformat(item["last_retry"])
if item.get("last_error_time"):
item["last_error_time"] = datetime.fromisoformat(item["last_error_time"])
item["last_error_time"] = datetime.fromisoformat(
item["last_error_time"]
)
self._queue.append(QueueItem(**item))
self._processing = {
@@ -438,9 +449,13 @@ class EnhancedVideoQueueManager:
self.metrics.success_rate = metrics_data["success_rate"]
self.metrics.errors_by_type = metrics_data["errors_by_type"]
self.metrics.last_error = metrics_data["last_error"]
self.metrics.compression_failures = metrics_data.get("compression_failures", 0)
self.metrics.hardware_accel_failures = metrics_data.get("hardware_accel_failures", 0)
self.metrics.compression_failures = metrics_data.get(
"compression_failures", 0
)
self.metrics.hardware_accel_failures = metrics_data.get(
"hardware_accel_failures", 0
)
if metrics_data["last_error_time"]:
self.metrics.last_error_time = datetime.fromisoformat(
metrics_data["last_error_time"]
@@ -477,6 +492,7 @@ class EnhancedVideoQueueManager:
logger.warning(f"High memory usage detected: {memory_usage:.2f}MB")
# Force garbage collection
import gc
gc.collect()
# Check for potential deadlocks with reduced threshold
@@ -524,7 +540,8 @@ class EnhancedVideoQueueManager:
for url, item in list(self._processing.items()):
if (
item.processing_time > 0
and (current_time - item.processing_time) > self.deadlock_threshold
and (current_time - item.processing_time)
> self.deadlock_threshold
):
# Move to failed queue if max retries reached
if item.retry_count >= self.max_retries:
@@ -576,19 +593,29 @@ class EnhancedVideoQueueManager:
def get_queue_status(self, guild_id: int) -> dict:
"""Get current queue status and metrics for a guild
Args:
guild_id: The ID of the guild to get status for
Returns:
dict: Queue status including counts and metrics
"""
try:
# Count items for this guild
pending = len([item for item in self._queue if item.guild_id == guild_id])
processing = len([item for item in self._processing.values() if item.guild_id == guild_id])
completed = len([item for item in self._completed.values() if item.guild_id == guild_id])
failed = len([item for item in self._failed.values() if item.guild_id == guild_id])
processing = len(
[
item
for item in self._processing.values()
if item.guild_id == guild_id
]
)
completed = len(
[item for item in self._completed.values() if item.guild_id == guild_id]
)
failed = len(
[item for item in self._failed.values() if item.guild_id == guild_id]
)
# Get metrics
metrics = {
@@ -600,7 +627,7 @@ class EnhancedVideoQueueManager:
"last_cleanup": self.metrics.last_cleanup.strftime("%Y-%m-%d %H:%M:%S"),
"errors_by_type": self.metrics.errors_by_type,
"compression_failures": self.metrics.compression_failures,
"hardware_accel_failures": self.metrics.hardware_accel_failures
"hardware_accel_failures": self.metrics.hardware_accel_failures,
}
return {
@@ -608,7 +635,7 @@ class EnhancedVideoQueueManager:
"processing": processing,
"completed": completed,
"failed": failed,
"metrics": metrics
"metrics": metrics,
}
except Exception as e:
@@ -628,16 +655,16 @@ class EnhancedVideoQueueManager:
"last_cleanup": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"errors_by_type": {},
"compression_failures": 0,
"hardware_accel_failures": 0
}
"hardware_accel_failures": 0,
},
}
async def clear_guild_queue(self, guild_id: int) -> int:
"""Clear all queue items for a specific guild
Args:
guild_id: The ID of the guild to clear items for
Returns:
int: Number of items cleared
"""
@@ -646,47 +673,50 @@ class EnhancedVideoQueueManager:
async with self._queue_lock:
# Get URLs for this guild
guild_urls = self._guild_queues.get(guild_id, set())
# Clear from pending queue
self._queue = [item for item in self._queue if item.guild_id != guild_id]
self._queue = [
item for item in self._queue if item.guild_id != guild_id
]
# Clear from processing
for url in list(self._processing.keys()):
if self._processing[url].guild_id == guild_id:
self._processing.pop(url)
cleared_count += 1
# Clear from completed
for url in list(self._completed.keys()):
if self._completed[url].guild_id == guild_id:
self._completed.pop(url)
cleared_count += 1
# Clear from failed
for url in list(self._failed.keys()):
if self._failed[url].guild_id == guild_id:
self._failed.pop(url)
cleared_count += 1
# Clear guild tracking
if guild_id in self._guild_queues:
cleared_count += len(self._guild_queues[guild_id])
self._guild_queues[guild_id].clear()
# Clear channel tracking for this guild's channels
for channel_id in list(self._channel_queues.keys()):
self._channel_queues[channel_id] = {
url for url in self._channel_queues[channel_id]
url
for url in self._channel_queues[channel_id]
if url not in guild_urls
}
# Persist updated state
if self.persistence_path:
await self._persist_queue()
logger.info(f"Cleared {cleared_count} items from guild {guild_id} queue")
return cleared_count
except Exception as e:
logger.error(f"Error clearing guild queue: {traceback.format_exc()}")
raise QueueError(f"Failed to clear guild queue: {str(e)}")