diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index 077f25d..cd39393 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -37,7 +37,7 @@ class VideoProcessor: max_queue_size=1000, cleanup_interval=1800, # 30 minutes (reduced from 1 hour for more frequent cleanup) max_history_age=86400, # 24 hours - persistence_path=str(queue_path), + persistence_path=str(queue_path) ) # Track failed downloads for cleanup @@ -45,33 +45,176 @@ class VideoProcessor: self._failed_downloads_lock = asyncio.Lock() # Start queue processing - self._queue_task = asyncio.create_task(self._process_queue()) - - async def _process_queue(self): - """Process the queue continuously""" - try: - await self.queue_manager.process_queue(self._process_video) - except Exception as e: - logger.error(f"Queue processing error: {traceback.format_exc()}") - # Restart queue processing - self._queue_task = asyncio.create_task(self._process_queue()) + self._queue_task = asyncio.create_task(self.queue_manager.process_queue(self._process_video)) async def _process_video(self, item: Any) -> Tuple[bool, Optional[str]]: """Process a video from the queue""" try: - # Get the callback from the item - callback = getattr(item, "callback", None) - if callback: - success = await callback(item.url, True, "") - return success, None if success else "Callback failed" - return False, "No callback found" + # Get the message + channel = self.bot.get_channel(item.channel_id) + if not channel: + return False, "Channel not found" + + try: + message = await channel.fetch_message(item.message_id) + if not message: + return False, "Message not found" + except discord.NotFound: + return False, "Message not found" + except discord.Forbidden: + return False, "Bot lacks permissions to fetch message" + except Exception as e: + return False, f"Error fetching message: {str(e)}" + + guild_id = message.guild.id + file_path = None + start_time = datetime.utcnow() + + try: + settings = await self.config.get_guild_settings(guild_id) + + # Download video with enhanced error handling + try: + success, file_path, error = await self.components[guild_id][ + "downloader" + ].download_video(item.url) + except Exception as e: + logger.error(f"Download error: {traceback.format_exc()}") + success, file_path, error = False, None, str(e) + + if not success: + await message.remove_reaction("⏳", self.bot.user) + await message.add_reaction("❌") + await self._log_message( + message.guild, f"Failed to download video: {error}", "error" + ) + # Track failed download for cleanup + if file_path: + async with self._failed_downloads_lock: + self._failed_downloads.add(file_path) + return False, error + + # Get channels with enhanced error handling + try: + archive_channel = await self.config.get_channel( + message.guild, "archive" + ) + notification_channel = await self.config.get_channel( + message.guild, "notification" + ) + if not notification_channel: + notification_channel = archive_channel + + if not archive_channel or not notification_channel: + raise DiscordAPIError("Required channels not found") + except Exception as e: + await self._log_message( + message.guild, + f"Channel configuration error: {str(e)}", + "error", + ) + return False, str(e) + + try: + # Upload to archive channel with original message link + file = discord.File(file_path) + archive_message = await archive_channel.send( + f"Original: {message.jump_url}", file=file + ) + + # Send notification with enhanced error handling for message formatting + try: + notification_content = self.components[guild_id][ + "message_manager" + ].format_archive_message( + username=message.author.name, + channel=message.channel.name, + original_message=message.jump_url, + ) + except Exception as e: + logger.error(f"Message formatting error: {str(e)}") + notification_content = f"Video archived from {message.author.name} in {message.channel.name}\nOriginal: {message.jump_url}" + + notification_message = await notification_channel.send( + notification_content + ) + + # Schedule notification message deletion with error handling + try: + await self.components[guild_id][ + "message_manager" + ].schedule_message_deletion( + notification_message.id, notification_message.delete + ) + except Exception as e: + logger.error( + f"Failed to schedule message deletion: {str(e)}" + ) + + # Update reaction to show completion + await message.remove_reaction("⏳", self.bot.user) + await message.add_reaction("✅") + + # Log processing time + processing_time = ( + datetime.utcnow() - start_time + ).total_seconds() + await self._log_message( + message.guild, + f"Successfully archived video from {message.author} (took {processing_time:.1f}s)", + ) + + return True, None + + except discord.HTTPException as e: + await self._log_message( + message.guild, f"Discord API error: {str(e)}", "error" + ) + await message.remove_reaction("⏳", self.bot.user) + await message.add_reaction("❌") + return False, str(e) + + finally: + # Always attempt to delete the file if configured + if settings["delete_after_repost"] and file_path: + try: + if secure_delete_file(file_path): + await self._log_message( + message.guild, + f"Successfully deleted file: {file_path}", + ) + else: + await self._log_message( + message.guild, + f"Failed to delete file: {file_path}", + "error", + ) + # Emergency cleanup + cleanup_downloads( + str( + self.components[guild_id][ + "downloader" + ].download_path + ) + ) + except Exception as e: + logger.error(f"File deletion error: {str(e)}") + # Track for later cleanup + async with self._failed_downloads_lock: + self._failed_downloads.add(file_path) + + except Exception as e: + logger.error(f"Process error: {traceback.format_exc()}") + await self._log_message( + message.guild, f"Error in process: {str(e)}", "error" + ) + return False, str(e) + except Exception as e: logger.error(f"Error processing video: {traceback.format_exc()}") return False, str(e) - async def process_video_url( - self, url: str, message: discord.Message, priority: int = 0 - ) -> bool: + async def process_video_url(self, url: str, message: discord.Message, priority: int = 0) -> bool: """Process a video URL: download, reupload, and cleanup""" guild_id = message.guild.id start_time = datetime.utcnow() @@ -95,155 +238,6 @@ class VideoProcessor: ) return False - # Create callback for queue processing with enhanced error handling - async def process_callback(url: str, success: bool, error: str) -> bool: - file_path = None - try: - if not success: - await message.remove_reaction("⏳", self.bot.user) - await message.add_reaction("❌") - await self._log_message( - message.guild, f"Failed to process video: {error}", "error" - ) - return False - - # Download video with enhanced error handling - try: - success, file_path, error = await self.components[guild_id][ - "downloader" - ].download_video(url) - except Exception as e: - logger.error(f"Download error: {traceback.format_exc()}") - success, file_path, error = False, None, str(e) - - if not success: - await message.remove_reaction("⏳", self.bot.user) - await message.add_reaction("❌") - await self._log_message( - message.guild, f"Failed to download video: {error}", "error" - ) - # Track failed download for cleanup - if file_path: - async with self._failed_downloads_lock: - self._failed_downloads.add(file_path) - return False - - # Get channels with enhanced error handling - try: - archive_channel = await self.config.get_channel( - message.guild, "archive" - ) - notification_channel = await self.config.get_channel( - message.guild, "notification" - ) - if not notification_channel: - notification_channel = archive_channel - - if not archive_channel or not notification_channel: - raise DiscordAPIError("Required channels not found") - except Exception as e: - await self._log_message( - message.guild, - f"Channel configuration error: {str(e)}", - "error", - ) - return False - - try: - # Upload to archive channel with original message link - file = discord.File(file_path) - archive_message = await archive_channel.send( - f"Original: {message.jump_url}", file=file - ) - - # Send notification with enhanced error handling for message formatting - try: - notification_content = self.components[guild_id][ - "message_manager" - ].format_archive_message( - username=message.author.name, - channel=message.channel.name, - original_message=message.jump_url, - ) - except Exception as e: - logger.error(f"Message formatting error: {str(e)}") - notification_content = f"Video archived from {message.author.name} in {message.channel.name}\nOriginal: {message.jump_url}" - - notification_message = await notification_channel.send( - notification_content - ) - - # Schedule notification message deletion with error handling - try: - await self.components[guild_id][ - "message_manager" - ].schedule_message_deletion( - notification_message.id, notification_message.delete - ) - except Exception as e: - logger.error( - f"Failed to schedule message deletion: {str(e)}" - ) - - # Update reaction to show completion - await message.remove_reaction("⏳", self.bot.user) - await message.add_reaction("✅") - - # Log processing time - processing_time = ( - datetime.utcnow() - start_time - ).total_seconds() - await self._log_message( - message.guild, - f"Successfully archived video from {message.author} (took {processing_time:.1f}s)", - ) - - return True - - except discord.HTTPException as e: - await self._log_message( - message.guild, f"Discord API error: {str(e)}", "error" - ) - await message.remove_reaction("⏳", self.bot.user) - await message.add_reaction("❌") - return False - - finally: - # Always attempt to delete the file if configured - if settings["delete_after_repost"] and file_path: - try: - if secure_delete_file(file_path): - await self._log_message( - message.guild, - f"Successfully deleted file: {file_path}", - ) - else: - await self._log_message( - message.guild, - f"Failed to delete file: {file_path}", - "error", - ) - # Emergency cleanup - cleanup_downloads( - str( - self.components[guild_id][ - "downloader" - ].download_path - ) - ) - except Exception as e: - logger.error(f"File deletion error: {str(e)}") - # Track for later cleanup - async with self._failed_downloads_lock: - self._failed_downloads.add(file_path) - - except Exception as e: - logger.error(f"Process callback error: {traceback.format_exc()}") - await self._log_message( - message.guild, f"Error in process callback: {str(e)}", "error" - ) - return False - # Add to enhanced queue with priority and error handling try: await self.queue_manager.add_to_queue( @@ -252,7 +246,7 @@ class VideoProcessor: channel_id=message.channel.id, guild_id=guild_id, author_id=message.author.id, - callback=process_callback, + callback=None, # No callback needed since _process_video handles everything priority=priority, ) except Exception as e: