diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index 00005f9..a262675 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -19,6 +19,17 @@ from videoarchiver.utils.exceptions import ( logger = logging.getLogger("VideoArchiver") +# Reaction emojis +REACTIONS = { + 'queued': '📹', + 'processing': '⚙️', + 'success': '✅', + 'error': '❌', + 'numbers': ['1️⃣', '2️⃣', '3️⃣', '4️⃣', '5️⃣'], # Queue position indicators + 'progress': ['⬛', '🟨', '🟩'], # Progress indicators (0%, 50%, 100%) + 'download': ['0️⃣', '2️⃣', '4️⃣', '6️⃣', '8️⃣', '🔟'] # Download progress (0%, 20%, 40%, 60%, 80%, 100%) +} + class VideoProcessor: """Handles video processing operations""" @@ -63,6 +74,78 @@ class VideoProcessor: self._queue_task = asyncio.create_task(self.queue_manager.process_queue(self._process_video)) logger.info("Video processing queue started successfully") + async def update_queue_position_reaction(self, message, position): + """Update queue position reaction""" + try: + # Remove any existing number reactions + for reaction in REACTIONS['numbers']: + try: + await message.remove_reaction(reaction, self.bot.user) + except: + pass + + # Add new position reaction if within range + if 0 <= position < len(REACTIONS['numbers']): + await message.add_reaction(REACTIONS['numbers'][position]) + logger.info(f"Updated queue position reaction to {position + 1} for message {message.id}") + except Exception as e: + logger.error(f"Failed to update queue position reaction: {e}") + + async def update_progress_reaction(self, message, progress): + """Update progress reaction based on FFmpeg progress""" + try: + # Remove existing progress reactions + for reaction in REACTIONS['progress']: + try: + await message.remove_reaction(reaction, self.bot.user) + except: + pass + + # Add appropriate progress reaction + if progress < 33: + await message.add_reaction(REACTIONS['progress'][0]) + logger.info(f"FFmpeg progress 0-33% for message {message.id}") + elif progress < 66: + await message.add_reaction(REACTIONS['progress'][1]) + logger.info(f"FFmpeg progress 33-66% for message {message.id}") + else: + await message.add_reaction(REACTIONS['progress'][2]) + logger.info(f"FFmpeg progress 66-100% for message {message.id}") + except Exception as e: + logger.error(f"Failed to update progress reaction: {e}") + + async def update_download_progress_reaction(self, message, progress): + """Update download progress reaction""" + try: + # Remove existing download progress reactions + for reaction in REACTIONS['download']: + try: + await message.remove_reaction(reaction, self.bot.user) + except: + pass + + # Add appropriate download progress reaction + if progress <= 20: + await message.add_reaction(REACTIONS['download'][0]) + logger.info(f"Download progress 0-20% for message {message.id}") + elif progress <= 40: + await message.add_reaction(REACTIONS['download'][1]) + logger.info(f"Download progress 20-40% for message {message.id}") + elif progress <= 60: + await message.add_reaction(REACTIONS['download'][2]) + logger.info(f"Download progress 40-60% for message {message.id}") + elif progress <= 80: + await message.add_reaction(REACTIONS['download'][3]) + logger.info(f"Download progress 60-80% for message {message.id}") + elif progress < 100: + await message.add_reaction(REACTIONS['download'][4]) + logger.info(f"Download progress 80-100% for message {message.id}") + else: + await message.add_reaction(REACTIONS['download'][5]) + logger.info(f"Download completed (100%) for message {message.id}") + except Exception as e: + logger.error(f"Failed to update download progress reaction: {e}") + async def process_message(self, message): """Process a message for video content""" try: @@ -80,11 +163,12 @@ class VideoProcessor: if not content or not downloader.is_supported_url(content): return - # Add video camera reaction to indicate processing + # Add initial queued reaction try: - await message.add_reaction("📹") + await message.add_reaction(REACTIONS['queued']) + logger.info(f"Added queued reaction to message {message.id}") except Exception as e: - logger.error(f"Failed to add video camera reaction: {e}") + logger.error(f"Failed to add queued reaction: {e}") # Add to processing queue await self.queue_manager.add_to_queue( @@ -94,6 +178,13 @@ class VideoProcessor: guild_id=message.guild.id, author_id=message.author.id ) + logger.info(f"Added message {message.id} to processing queue") + + # Update queue position + queue_status = self.queue_manager.get_queue_status(message.guild.id) + queue_position = queue_status['pending'] - 1 # -1 because this item was just added + await self.update_queue_position_reaction(message, queue_position) + logger.info(f"Message {message.id} is at position {queue_position + 1} in queue") except Exception as e: logger.error(f"Error processing message: {traceback.format_exc()}") @@ -102,6 +193,7 @@ class VideoProcessor: async def _process_video(self, item) -> Tuple[bool, Optional[str]]: """Process a video from the queue""" file_path = None + original_message = None try: guild_id = item.guild_id if guild_id not in self.components: @@ -120,6 +212,11 @@ class VideoProcessor: if not channel: return False, f"Channel {item.channel_id} not found" original_message = await channel.fetch_message(item.message_id) + + # Update reactions to show processing + await original_message.remove_reaction(REACTIONS['queued'], self.bot.user) + await original_message.add_reaction(REACTIONS['processing']) + logger.info(f"Started processing message {item.message_id}") except discord.NotFound: original_message = None except Exception as e: @@ -128,10 +225,19 @@ class VideoProcessor: # Download and process video try: - success, file_path, error = await downloader.download_video(item.url) + success, file_path, error = await downloader.download_video( + item.url, + progress_callback=lambda progress: self.update_download_progress_reaction(original_message, progress) if original_message else None + ) if not success: + if original_message: + await original_message.add_reaction(REACTIONS['error']) + logger.error(f"Download failed for message {item.message_id}: {error}") return False, f"Failed to download video: {error}" except Exception as e: + if original_message: + await original_message.add_reaction(REACTIONS['error']) + logger.error(f"Download error for message {item.message_id}: {str(e)}") return False, f"Download error: {str(e)}" # Get archive channel @@ -163,14 +269,29 @@ class VideoProcessor: content=message, file=discord.File(file_path) ) + + # Update reactions for success + if original_message: + await original_message.remove_reaction(REACTIONS['processing'], self.bot.user) + await original_message.add_reaction(REACTIONS['success']) + logger.info(f"Successfully processed message {item.message_id}") + return True, None except discord.HTTPException as e: + if original_message: + await original_message.add_reaction(REACTIONS['error']) + logger.error(f"Failed to upload to Discord for message {item.message_id}: {str(e)}") return False, f"Failed to upload to Discord: {str(e)}" except Exception as e: + if original_message: + await original_message.add_reaction(REACTIONS['error']) + logger.error(f"Failed to archive video for message {item.message_id}: {str(e)}") return False, f"Failed to archive video: {str(e)}" except Exception as e: + if original_message: + await original_message.add_reaction(REACTIONS['error']) logger.error(f"Error processing video: {traceback.format_exc()}") return False, str(e) diff --git a/videoarchiver/utils/video_downloader.py b/videoarchiver/utils/video_downloader.py index 51d4c95..92ad802 100644 --- a/videoarchiver/utils/video_downloader.py +++ b/videoarchiver/utils/video_downloader.py @@ -10,7 +10,7 @@ import shutil import subprocess import json from concurrent.futures import ThreadPoolExecutor -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Callable from pathlib import Path from videoarchiver.ffmpeg.ffmpeg_manager import FFmpegManager @@ -169,7 +169,11 @@ class VideoDownloader: logger.error(f"Error during URL check: {str(e)}") return False - async def download_video(self, url: str) -> Tuple[bool, str, str]: + async def download_video( + self, + url: str, + progress_callback: Optional[Callable[[float], None]] = None + ) -> Tuple[bool, str, str]: """Download and process a video with improved error handling and retry logic""" original_file = None compressed_file = None @@ -180,7 +184,7 @@ class VideoDownloader: try: with temp_path_context() as temp_dir: # Download the video - success, file_path, error = await self._safe_download(url, temp_dir) + success, file_path, error = await self._safe_download(url, temp_dir, progress_callback) if not success: return False, "", error @@ -208,6 +212,7 @@ class VideoDownloader: original_file, compressed_file, compression_params, + progress_callback, use_hardware=True ) @@ -219,6 +224,7 @@ class VideoDownloader: original_file, compressed_file, compression_params, + progress_callback, use_hardware=False ) @@ -289,6 +295,7 @@ class VideoDownloader: input_file: str, output_file: str, params: Dict[str, str], + progress_callback: Optional[Callable[[float], None]] = None, use_hardware: bool = True ) -> bool: """Attempt video compression with given parameters""" @@ -297,6 +304,9 @@ class VideoDownloader: ffmpeg_path = str(self.ffmpeg_mgr.get_ffmpeg_path()) cmd = [ffmpeg_path, "-y", "-i", input_file] + # Add progress monitoring + cmd.extend(["-progress", "pipe:1"]) + # Modify parameters based on hardware acceleration preference if use_hardware: gpu_info = self.ffmpeg_mgr.gpu_info @@ -316,18 +326,32 @@ class VideoDownloader: # Add output file cmd.append(output_file) - # Run compression - logger.debug(f"Running FFmpeg command: {' '.join(cmd)}") - result = await asyncio.get_event_loop().run_in_executor( - self.download_pool, - lambda: subprocess.run( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=True, - ), + # Get video duration for progress calculation + duration = self._get_video_duration(input_file) + + # Run compression with progress monitoring + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE ) + while True: + line = await process.stdout.readline() + if not line: + break + + try: + line = line.decode().strip() + if line.startswith("out_time_ms="): + current_time = int(line.split("=")[1]) / 1000000 # Convert microseconds to seconds + if duration > 0 and progress_callback: + progress = min(100, (current_time / duration) * 100) + await progress_callback(progress) + except Exception as e: + logger.error(f"Error parsing FFmpeg progress: {e}") + + await process.wait() return os.path.exists(output_file) except subprocess.CalledProcessError as e: @@ -337,6 +361,24 @@ class VideoDownloader: logger.error(f"Compression attempt failed: {str(e)}") return False + def _get_video_duration(self, file_path: str) -> float: + """Get video duration in seconds""" + try: + ffprobe_path = str(self.ffmpeg_mgr.get_ffprobe_path()) + cmd = [ + ffprobe_path, + "-v", "quiet", + "-print_format", "json", + "-show_format", + file_path + ] + result = subprocess.run(cmd, capture_output=True, text=True) + data = json.loads(result.stdout) + return float(data["format"]["duration"]) + except Exception as e: + logger.error(f"Error getting video duration: {e}") + return 0 + def _check_file_size(self, info): """Check if file size is within limits""" if info.get("filepath") and os.path.exists(info["filepath"]): @@ -355,10 +397,10 @@ class VideoDownloader: logger.info(f"Download completed: {d['filename']}") elif d["status"] == "downloading": try: - percent = d.get("_percent_str", "N/A") + percent = float(d.get("_percent_str", "0").replace('%', '')) speed = d.get("_speed_str", "N/A") eta = d.get("_eta_str", "N/A") - logger.debug(f"Download progress: {percent} at {speed}, ETA: {eta}") + logger.debug(f"Download progress: {percent}% at {speed}, ETA: {eta}") except Exception as e: logger.debug(f"Error logging progress: {str(e)}") @@ -412,12 +454,30 @@ class VideoDownloader: logger.error(f"Error verifying video file {file_path}: {e}") return False - async def _safe_download(self, url: str, temp_dir: str) -> Tuple[bool, str, str]: + async def _safe_download( + self, + url: str, + temp_dir: str, + progress_callback: Optional[Callable[[float], None]] = None + ) -> Tuple[bool, str, str]: """Safely download video with retries""" for attempt in range(self.MAX_RETRIES): try: ydl_opts = self.ydl_opts.copy() ydl_opts["outtmpl"] = os.path.join(temp_dir, ydl_opts["outtmpl"]) + + # Add progress callback + if progress_callback: + original_progress_hook = ydl_opts["progress_hooks"][0] + def combined_progress_hook(d): + original_progress_hook(d) + if d["status"] == "downloading": + try: + percent = float(d.get("_percent_str", "0").replace('%', '')) + asyncio.create_task(progress_callback(percent)) + except Exception as e: + logger.error(f"Error in progress callback: {e}") + ydl_opts["progress_hooks"] = [combined_progress_hook] with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = await asyncio.get_event_loop().run_in_executor(