"""Video processing logic for VideoArchiver""" import os import logging import asyncio import discord from discord.ext import commands from discord import app_commands from pathlib import Path from typing import Dict, Any, Optional, Tuple, Set import traceback from datetime import datetime from videoarchiver.enhanced_queue import EnhancedVideoQueueManager from videoarchiver.utils.exceptions import ( ProcessingError, ConfigurationError, VideoVerificationError, QueueError, FileOperationError ) logger = logging.getLogger("VideoArchiver") # Reaction emojis REACTIONS = { 'queued': '📹', 'processing': '⚙️', 'success': '✅', 'error': '❌', 'numbers': ['1️⃣', '2️⃣', '3️⃣', '4️⃣', '5️⃣'], 'progress': ['⬛', '🟨', '🟩'], 'download': ['0️⃣', '2️⃣', '4️⃣', '6️⃣', '8️⃣', '🔟'] } # Global queue manager instance to persist across reloads _global_queue_manager = None # Track detailed progress information _download_progress: Dict[str, Dict[str, Any]] = {} _compression_progress: Dict[str, Dict[str, Any]] = {} class VideoProcessor: """Handles video processing operations""" def __init__( self, bot, config_manager, components, queue_manager=None, ffmpeg_mgr=None ): self.bot = bot self.config = config_manager self.components = components self.ffmpeg_mgr = ffmpeg_mgr # Track active downloads and their tasks self._active_downloads: Dict[str, asyncio.Task] = {} self._active_downloads_lock = asyncio.Lock() self._unloading = False # Use global queue manager if available global _global_queue_manager if _global_queue_manager is not None: self.queue_manager = _global_queue_manager logger.info("Using existing global queue manager") elif queue_manager: self.queue_manager = queue_manager _global_queue_manager = queue_manager logger.info("Using provided queue manager and setting as global") else: data_dir = Path(os.path.dirname(__file__)) / "data" data_dir.mkdir(parents=True, exist_ok=True) queue_path = data_dir / "queue_state.json" self.queue_manager = EnhancedVideoQueueManager( max_retries=3, retry_delay=5, max_queue_size=1000, cleanup_interval=1800, max_history_age=86400, persistence_path=str(queue_path) ) _global_queue_manager = self.queue_manager logger.info("Created new queue manager and set as global") # Start queue processing logger.info("Starting video processing queue...") self._queue_task = asyncio.create_task(self.queue_manager.process_queue(self._process_video)) logger.info("Video processing queue started successfully") async def _cancel_active_downloads(self) -> None: """Cancel all active downloads and requeue them""" async with self._active_downloads_lock: for url, task in self._active_downloads.items(): if not task.done(): # Cancel the task task.cancel() try: await task except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error cancelling download task for {url}: {e}") # Requeue the download if we're unloading if self._unloading and url in _download_progress: try: # Get the original message details from progress tracking progress = _download_progress[url] if progress.get('message_id') and progress.get('channel_id') and progress.get('guild_id'): await self.queue_manager.add_to_queue( url=url, message_id=progress['message_id'], channel_id=progress['channel_id'], guild_id=progress['guild_id'], author_id=progress.get('author_id') ) logger.info(f"Requeued download for {url}") except Exception as e: logger.error(f"Failed to requeue download for {url}: {e}") self._active_downloads.clear() async def cleanup(self) -> None: """Clean up resources with proper handling""" try: self._unloading = True # Cancel queue processing if hasattr(self, '_queue_task') and not self._queue_task.done(): self._queue_task.cancel() try: await self._queue_task except asyncio.CancelledError: pass # Cancel and requeue active downloads await self._cancel_active_downloads() # Clean up progress tracking _download_progress.clear() _compression_progress.clear() except Exception as e: logger.error(f"Error during processor cleanup: {e}") raise finally: self._unloading = False async def force_cleanup(self) -> None: """Force cleanup of resources when timeout occurs""" try: # Cancel all tasks immediately without requeuing async with self._active_downloads_lock: for task in self._active_downloads.values(): if not task.done(): task.cancel() # Cancel queue task if hasattr(self, '_queue_task') and not self._queue_task.done(): self._queue_task.cancel() # Clear all tracking self._active_downloads.clear() _download_progress.clear() _compression_progress.clear() except Exception as e: logger.error(f"Error during force cleanup: {e}") async def process_message(self, message: discord.Message) -> None: """Process a message for video content""" try: if not message.guild or not message.guild.id in self.components: return components = self.components[message.guild.id] downloader = components.get("downloader") if not downloader: logger.error(f"No downloader found for guild {message.guild.id}") return content = message.content.strip() if not content or not downloader.is_supported_url(content): return try: await message.add_reaction(REACTIONS['queued']) logger.info(f"Added queued reaction to message {message.id}") except Exception as e: logger.error(f"Failed to add queued reaction: {e}") # Track message details in progress tracking _download_progress[content] = { 'active': False, 'message_id': message.id, 'channel_id': message.channel.id, 'guild_id': message.guild.id, 'author_id': message.author.id, 'start_time': None, 'percent': 0, 'speed': 'N/A', 'eta': 'N/A', 'downloaded_bytes': 0, 'total_bytes': 0, 'retries': 0 } await self.queue_manager.add_to_queue( url=content, message_id=message.id, channel_id=message.channel.id, guild_id=message.guild.id, author_id=message.author.id ) logger.info(f"Added message {message.id} to processing queue") queue_status = self.queue_manager.get_queue_status(message.guild.id) queue_position = queue_status['pending'] - 1 await self.update_queue_position_reaction(message, queue_position) logger.info(f"Message {message.id} is at position {queue_position + 1} in queue") except Exception as e: logger.error(f"Error processing message: {traceback.format_exc()}") raise ProcessingError(f"Failed to process message: {str(e)}") async def _process_video(self, item) -> Tuple[bool, Optional[str]]: """Process a video from the queue""" if self._unloading: return False, "Processor is unloading" file_path = None original_message = None download_task = None try: guild_id = item.guild_id if guild_id not in self.components: return False, f"No components found for guild {guild_id}" components = self.components[guild_id] downloader = components.get("downloader") message_manager = components.get("message_manager") if not downloader or not message_manager: return False, f"Missing required components for guild {guild_id}" try: channel = self.bot.get_channel(item.channel_id) if not channel: return False, f"Channel {item.channel_id} not found" original_message = await channel.fetch_message(item.message_id) await original_message.remove_reaction(REACTIONS['queued'], self.bot.user) await original_message.add_reaction(REACTIONS['processing']) logger.info(f"Started processing message {item.message_id}") except discord.NotFound: original_message = None except Exception as e: logger.error(f"Error fetching original message: {e}") original_message = None # Create and track download task download_task = asyncio.create_task( downloader.download_video( item.url, progress_callback=lambda progress: self.update_download_progress_reaction(original_message, progress) if original_message else None ) ) async with self._active_downloads_lock: self._active_downloads[item.url] = download_task try: success, file_path, error = await download_task if not success: if original_message: await original_message.add_reaction(REACTIONS['error']) logger.error(f"Download failed for message {item.message_id}: {error}") return False, f"Failed to download video: {error}" except asyncio.CancelledError: logger.info(f"Download cancelled for {item.url}") return False, "Download cancelled" except Exception as e: if original_message: await original_message.add_reaction(REACTIONS['error']) logger.error(f"Download error for message {item.message_id}: {str(e)}") return False, f"Download error: {str(e)}" finally: async with self._active_downloads_lock: self._active_downloads.pop(item.url, None) # Get archive channel guild = self.bot.get_guild(guild_id) if not guild: return False, f"Guild {guild_id} not found" archive_channel = await self.config.get_channel(guild, "archive") if not archive_channel: return False, "Archive channel not configured" # Format message try: author = original_message.author if original_message else None message = await message_manager.format_message( author=author, channel=channel, url=item.url ) except Exception as e: return False, f"Failed to format message: {str(e)}" # Upload to archive channel try: if not os.path.exists(file_path): return False, "Processed file not found" await archive_channel.send( content=message, file=discord.File(file_path) ) if original_message: await original_message.remove_reaction(REACTIONS['processing'], self.bot.user) await original_message.add_reaction(REACTIONS['success']) logger.info(f"Successfully processed message {item.message_id}") return True, None except discord.HTTPException as e: if original_message: await original_message.add_reaction(REACTIONS['error']) logger.error(f"Failed to upload to Discord for message {item.message_id}: {str(e)}") return False, f"Failed to upload to Discord: {str(e)}" except Exception as e: if original_message: await original_message.add_reaction(REACTIONS['error']) logger.error(f"Failed to archive video for message {item.message_id}: {str(e)}") return False, f"Failed to archive video: {str(e)}" except Exception as e: logger.error(f"Error processing video: {traceback.format_exc()}") return False, str(e) finally: # Clean up downloaded file if file_path and os.path.exists(file_path): try: os.unlink(file_path) except Exception as e: logger.error(f"Failed to clean up file {file_path}: {e}") async def update_queue_position_reaction(self, message, position): """Update queue position reaction""" try: for reaction in REACTIONS['numbers']: try: await message.remove_reaction(reaction, self.bot.user) except: pass if 0 <= position < len(REACTIONS['numbers']): await message.add_reaction(REACTIONS['numbers'][position]) logger.info(f"Updated queue position reaction to {position + 1} for message {message.id}") except Exception as e: logger.error(f"Failed to update queue position reaction: {e}") async def update_progress_reaction(self, message, progress): """Update progress reaction based on FFmpeg progress""" try: for reaction in REACTIONS['progress']: try: await message.remove_reaction(reaction, self.bot.user) except: pass if progress < 33: await message.add_reaction(REACTIONS['progress'][0]) elif progress < 66: await message.add_reaction(REACTIONS['progress'][1]) else: await message.add_reaction(REACTIONS['progress'][2]) except Exception as e: logger.error(f"Failed to update progress reaction: {e}") async def update_download_progress_reaction(self, message, progress): """Update download progress reaction""" if not message: return try: for reaction in REACTIONS['download']: try: await message.remove_reaction(reaction, self.bot.user) except: pass if progress <= 20: await message.add_reaction(REACTIONS['download'][0]) elif progress <= 40: await message.add_reaction(REACTIONS['download'][1]) elif progress <= 60: await message.add_reaction(REACTIONS['download'][2]) elif progress <= 80: await message.add_reaction(REACTIONS['download'][3]) elif progress < 100: await message.add_reaction(REACTIONS['download'][4]) else: await message.add_reaction(REACTIONS['download'][5]) except Exception as e: logger.error(f"Failed to update download progress reaction: {e}") async def _show_queue_details(self, ctx): """Display detailed queue status and progress information""" try: # Get queue status queue_status = self.queue_manager.get_queue_status(ctx.guild.id) # Create embed for queue overview embed = discord.Embed( title="Queue Status Details", color=discord.Color.blue(), timestamp=datetime.utcnow() ) # Queue statistics embed.add_field( name="Queue Statistics", value=f"```\n" f"Pending: {queue_status['pending']}\n" f"Processing: {queue_status['processing']}\n" f"Completed: {queue_status['completed']}\n" f"Failed: {queue_status['failed']}\n" f"Success Rate: {queue_status['metrics']['success_rate']:.1%}\n" f"Avg Processing Time: {queue_status['metrics']['avg_processing_time']:.1f}s\n" f"```", inline=False ) # Active downloads active_downloads = "" for url, progress in _download_progress.items(): if progress.get('active', False): active_downloads += ( f"URL: {url[:50]}...\n" f"Progress: {progress.get('percent', 0):.1f}%\n" f"Speed: {progress.get('speed', 'N/A')}\n" f"ETA: {progress.get('eta', 'N/A')}\n" f"Size: {progress.get('downloaded_bytes', 0)}/{progress.get('total_bytes', 0)} bytes\n" f"Started: {progress.get('start_time', 'N/A')}\n" f"Retries: {progress.get('retries', 0)}\n" f"-------------------\n" ) if active_downloads: embed.add_field( name="Active Downloads", value=f"```\n{active_downloads}```", inline=False ) else: embed.add_field( name="Active Downloads", value="```\nNo active downloads```", inline=False ) # Active compressions active_compressions = "" for url, progress in _compression_progress.items(): if progress.get('active', False): active_compressions += ( f"File: {progress.get('filename', 'Unknown')}\n" f"Progress: {progress.get('percent', 0):.1f}%\n" f"Time Elapsed: {progress.get('elapsed_time', 'N/A')}\n" f"Input Size: {progress.get('input_size', 0)} bytes\n" f"Current Size: {progress.get('current_size', 0)} bytes\n" f"Target Size: {progress.get('target_size', 0)} bytes\n" f"Codec: {progress.get('codec', 'Unknown')}\n" f"Hardware Accel: {progress.get('hardware_accel', False)}\n" f"-------------------\n" ) if active_compressions: embed.add_field( name="Active Compressions", value=f"```\n{active_compressions}```", inline=False ) else: embed.add_field( name="Active Compressions", value="```\nNo active compressions```", inline=False ) # Error statistics if queue_status['metrics']['errors_by_type']: error_stats = "\n".join( f"{error_type}: {count}" for error_type, count in queue_status['metrics']['errors_by_type'].items() ) embed.add_field( name="Error Statistics", value=f"```\n{error_stats}```", inline=False ) # Hardware acceleration statistics embed.add_field( name="Hardware Statistics", value=f"```\n" f"Hardware Accel Failures: {queue_status['metrics']['hardware_accel_failures']}\n" f"Compression Failures: {queue_status['metrics']['compression_failures']}\n" f"Peak Memory Usage: {queue_status['metrics']['peak_memory_usage']:.1f}MB\n" f"```", inline=False ) await ctx.send(embed=embed) except Exception as e: logger.error(f"Error showing queue details: {traceback.format_exc()}") await ctx.send(f"Error getting queue details: {str(e)}")