From f8d383a55a812f17654782cf0aff7afa52ece8f7 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Fri, 15 Nov 2024 14:12:43 +0000 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=A5=20Pending:=20Shows=20items=20waiti?= =?UTF-8?q?ng=20to=20be=20processed=20=E2=9A=99=EF=B8=8F=20Processing:=20S?= =?UTF-8?q?hows=20items=20currently=20being=20processed=20=E2=9C=85=20Comp?= =?UTF-8?q?leted:=20Shows=20successfully=20processed=20items=20=E2=9D=8C?= =?UTF-8?q?=20Failed:=20Shows=20failed=20items?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- videoarchiver/enhanced_queue.py | 92 +++++++++++++++++--------- videoarchiver/processor.py | 50 ++++++-------- videoarchiver/utils/message_manager.py | 21 +++++- 3 files changed, 101 insertions(+), 62 deletions(-) diff --git a/videoarchiver/enhanced_queue.py b/videoarchiver/enhanced_queue.py index 2482a44..b270766 100644 --- a/videoarchiver/enhanced_queue.py +++ b/videoarchiver/enhanced_queue.py @@ -115,6 +115,7 @@ class QueueItem: last_error_time: Optional[datetime] = None hardware_accel_attempted: bool = False compression_attempted: bool = False + original_message: Optional[Any] = None # Store the original message reference class EnhancedVideoQueueManager: @@ -190,7 +191,9 @@ class EnhancedVideoQueueManager: channel_id: int, guild_id: int, author_id: int, - callback: Optional[Callable[[str, bool, str], Any]] = None, # Make callback optional + callback: Optional[ + Callable[[str, bool, str], Any] + ] = None, # Make callback optional priority: int = 0, ) -> bool: """Add a video to the processing queue with priority support""" @@ -268,7 +271,9 @@ class EnhancedVideoQueueManager: start_time = time.time() logger.info(f"Calling processor for item: {item.url}") success, error = await processor(item) - logger.info(f"Processor result for {item.url}: success={success}, error={error}") + logger.info( + f"Processor result for {item.url}: success={success}, error={error}" + ) processing_time = time.time() - start_time # Update metrics @@ -299,7 +304,9 @@ class EnhancedVideoQueueManager: item.compression_attempted = True # Add back to queue with adjusted priority - item.priority = max(0, item.priority - 1) # Lower priority for retries + item.priority = max( + 0, item.priority - 1 + ) # Lower priority for retries self._queue.append(item) logger.warning( f"Retrying item: {item.url} (attempt {item.retry_count})" @@ -336,7 +343,9 @@ class EnhancedVideoQueueManager: # Continue processing even if persistence fails except Exception as e: - logger.error(f"Critical error in queue processor: {traceback.format_exc()}") + logger.error( + f"Critical error in queue processor: {traceback.format_exc()}" + ) # Ensure we don't get stuck in a tight loop on critical errors await asyncio.sleep(1) continue # Continue to next iteration to process remaining items @@ -421,7 +430,9 @@ class EnhancedVideoQueueManager: if item.get("last_retry"): item["last_retry"] = datetime.fromisoformat(item["last_retry"]) if item.get("last_error_time"): - item["last_error_time"] = datetime.fromisoformat(item["last_error_time"]) + item["last_error_time"] = datetime.fromisoformat( + item["last_error_time"] + ) self._queue.append(QueueItem(**item)) self._processing = { @@ -438,9 +449,13 @@ class EnhancedVideoQueueManager: self.metrics.success_rate = metrics_data["success_rate"] self.metrics.errors_by_type = metrics_data["errors_by_type"] self.metrics.last_error = metrics_data["last_error"] - self.metrics.compression_failures = metrics_data.get("compression_failures", 0) - self.metrics.hardware_accel_failures = metrics_data.get("hardware_accel_failures", 0) - + self.metrics.compression_failures = metrics_data.get( + "compression_failures", 0 + ) + self.metrics.hardware_accel_failures = metrics_data.get( + "hardware_accel_failures", 0 + ) + if metrics_data["last_error_time"]: self.metrics.last_error_time = datetime.fromisoformat( metrics_data["last_error_time"] @@ -477,6 +492,7 @@ class EnhancedVideoQueueManager: logger.warning(f"High memory usage detected: {memory_usage:.2f}MB") # Force garbage collection import gc + gc.collect() # Check for potential deadlocks with reduced threshold @@ -524,7 +540,8 @@ class EnhancedVideoQueueManager: for url, item in list(self._processing.items()): if ( item.processing_time > 0 - and (current_time - item.processing_time) > self.deadlock_threshold + and (current_time - item.processing_time) + > self.deadlock_threshold ): # Move to failed queue if max retries reached if item.retry_count >= self.max_retries: @@ -576,19 +593,29 @@ class EnhancedVideoQueueManager: def get_queue_status(self, guild_id: int) -> dict: """Get current queue status and metrics for a guild - + Args: guild_id: The ID of the guild to get status for - + Returns: dict: Queue status including counts and metrics """ try: # Count items for this guild pending = len([item for item in self._queue if item.guild_id == guild_id]) - processing = len([item for item in self._processing.values() if item.guild_id == guild_id]) - completed = len([item for item in self._completed.values() if item.guild_id == guild_id]) - failed = len([item for item in self._failed.values() if item.guild_id == guild_id]) + processing = len( + [ + item + for item in self._processing.values() + if item.guild_id == guild_id + ] + ) + completed = len( + [item for item in self._completed.values() if item.guild_id == guild_id] + ) + failed = len( + [item for item in self._failed.values() if item.guild_id == guild_id] + ) # Get metrics metrics = { @@ -600,7 +627,7 @@ class EnhancedVideoQueueManager: "last_cleanup": self.metrics.last_cleanup.strftime("%Y-%m-%d %H:%M:%S"), "errors_by_type": self.metrics.errors_by_type, "compression_failures": self.metrics.compression_failures, - "hardware_accel_failures": self.metrics.hardware_accel_failures + "hardware_accel_failures": self.metrics.hardware_accel_failures, } return { @@ -608,7 +635,7 @@ class EnhancedVideoQueueManager: "processing": processing, "completed": completed, "failed": failed, - "metrics": metrics + "metrics": metrics, } except Exception as e: @@ -628,16 +655,16 @@ class EnhancedVideoQueueManager: "last_cleanup": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), "errors_by_type": {}, "compression_failures": 0, - "hardware_accel_failures": 0 - } + "hardware_accel_failures": 0, + }, } async def clear_guild_queue(self, guild_id: int) -> int: """Clear all queue items for a specific guild - + Args: guild_id: The ID of the guild to clear items for - + Returns: int: Number of items cleared """ @@ -646,47 +673,50 @@ class EnhancedVideoQueueManager: async with self._queue_lock: # Get URLs for this guild guild_urls = self._guild_queues.get(guild_id, set()) - + # Clear from pending queue - self._queue = [item for item in self._queue if item.guild_id != guild_id] - + self._queue = [ + item for item in self._queue if item.guild_id != guild_id + ] + # Clear from processing for url in list(self._processing.keys()): if self._processing[url].guild_id == guild_id: self._processing.pop(url) cleared_count += 1 - + # Clear from completed for url in list(self._completed.keys()): if self._completed[url].guild_id == guild_id: self._completed.pop(url) cleared_count += 1 - + # Clear from failed for url in list(self._failed.keys()): if self._failed[url].guild_id == guild_id: self._failed.pop(url) cleared_count += 1 - + # Clear guild tracking if guild_id in self._guild_queues: cleared_count += len(self._guild_queues[guild_id]) self._guild_queues[guild_id].clear() - + # Clear channel tracking for this guild's channels for channel_id in list(self._channel_queues.keys()): self._channel_queues[channel_id] = { - url for url in self._channel_queues[channel_id] + url + for url in self._channel_queues[channel_id] if url not in guild_urls } - + # Persist updated state if self.persistence_path: await self._persist_queue() - + logger.info(f"Cleared {cleared_count} items from guild {guild_id} queue") return cleared_count - + except Exception as e: logger.error(f"Error clearing guild queue: {traceback.format_exc()}") raise QueueError(f"Failed to clear guild queue: {str(e)}") diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index 4663f92..cef659b 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -3,7 +3,7 @@ import os import logging import asyncio -import discord # Added missing import +import discord from pathlib import Path from typing import Dict, Any, Optional, Tuple import traceback @@ -80,6 +80,12 @@ class VideoProcessor: if not content or not downloader.is_supported_url(content): return + # Add video camera reaction to indicate processing + try: + await message.add_reaction("📹") + except Exception as e: + logger.error(f"Failed to add video camera reaction: {e}") + # Add to processing queue await self.queue_manager.add_to_queue( url=content, @@ -108,6 +114,18 @@ class VideoProcessor: if not downloader or not message_manager: return False, f"Missing required components for guild {guild_id}" + # Get original message + try: + channel = self.bot.get_channel(item.channel_id) + if not channel: + return False, f"Channel {item.channel_id} not found" + original_message = await channel.fetch_message(item.message_id) + except discord.NotFound: + original_message = None + except Exception as e: + logger.error(f"Error fetching original message: {e}") + original_message = None + # Download and process video try: success, file_path, error = await downloader.download_video(item.url) @@ -126,30 +144,12 @@ class VideoProcessor: if not archive_channel: return False, "Archive channel not configured" - # Get original message - try: - channel = self.bot.get_channel(item.channel_id) - if not channel: - return False, f"Channel {item.channel_id} not found" - original_message = await channel.fetch_message(item.message_id) - except discord.NotFound: - original_message = None - except Exception as e: - logger.error(f"Error fetching original message: {e}") - original_message = None - # Format message try: author = original_message.author if original_message else None message = await message_manager.format_message( author=author, channel=channel, - original_message=original_message - ) - except Exception as e: - logger.error(f"Error formatting message: {e}") - message = f"Video from {item.url}" - # Upload to archive channel try: if not os.path.exists(file_path): @@ -159,21 +159,12 @@ class VideoProcessor: content=message, file=discord.File(file_path) ) + except discord.HTTPException as e: return False, f"Failed to upload to Discord: {str(e)}" except Exception as e: return False, f"Failed to archive video: {str(e)}" - # Delete original if configured - if original_message: - try: - settings = await self.config.get_guild_settings(guild_id) - if settings.get("delete_after_repost", False): - await original_message.delete() - except Exception as e: - logger.warning(f"Failed to delete original message: {e}") - # Don't fail the process for deletion errors - return True, None except Exception as e: @@ -186,7 +177,6 @@ class VideoProcessor: os.unlink(file_path) except Exception as e: logger.error(f"Failed to clean up file {file_path}: {e}") - # Don't fail the process for cleanup errors except Exception as e: logger.error(f"Error processing video: {traceback.format_exc()}") diff --git a/videoarchiver/utils/message_manager.py b/videoarchiver/utils/message_manager.py index 005192e..0471edf 100644 --- a/videoarchiver/utils/message_manager.py +++ b/videoarchiver/utils/message_manager.py @@ -2,7 +2,7 @@ import asyncio import logging -from typing import Dict +from typing import Dict, List logger = logging.getLogger("VideoArchiver") @@ -20,6 +20,25 @@ class MessageManager: author=author, url=url, original_message=original_message ) + async def get_message_reactions(self, message) -> List[Dict]: + """Get all reactions from a message""" + reactions = [] + for reaction in message.reactions: + reactions.append({ + 'emoji': str(reaction.emoji), + 'count': reaction.count, + 'users': [user.id async for user in reaction.users()] + }) + return reactions + + async def restore_reactions(self, message, reactions: List[Dict]) -> None: + """Restore reactions to a message""" + for reaction_data in reactions: + try: + await message.add_reaction(reaction_data['emoji']) + except Exception as e: + logger.error(f"Failed to restore reaction {reaction_data['emoji']}: {str(e)}") + async def schedule_message_deletion(self, message_id: int, delete_func) -> None: if self.message_duration <= 0: return