From cc3fa06d00daef8aae6739bab0b0c2e205022964 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Fri, 15 Nov 2024 14:58:30 +0000 Subject: [PATCH] Add detailed progress tracking for downloads and compression: - Add detailed download progress hook with extensive metadata - Add comprehensive compression progress tracking - Track download speed, ETA, file sizes, and formats - Track compression codec, hardware acceleration, and bitrates - Add timestamps for all progress updates --- videoarchiver/utils/video_downloader.py | 217 +++++++++++++++++++----- 1 file changed, 171 insertions(+), 46 deletions(-) diff --git a/videoarchiver/utils/video_downloader.py b/videoarchiver/utils/video_downloader.py index 0897f89..ede8c69 100644 --- a/videoarchiver/utils/video_downloader.py +++ b/videoarchiver/utils/video_downloader.py @@ -12,6 +12,7 @@ import json from concurrent.futures import ThreadPoolExecutor from typing import Dict, List, Optional, Tuple, Callable from pathlib import Path +from datetime import datetime from videoarchiver.ffmpeg.ffmpeg_manager import FFmpegManager from videoarchiver.ffmpeg.exceptions import ( @@ -108,7 +109,7 @@ class VideoDownloader: "file_access_retries": self.FILE_OP_RETRIES, "extractor_retries": self.MAX_RETRIES, "postprocessor_hooks": [self._check_file_size], - "progress_hooks": [self._progress_hook], + "progress_hooks": [self._progress_hook, self._detailed_progress_hook], # Add detailed hook "ffmpeg_location": str(self.ffmpeg_mgr.get_ffmpeg_path()), "ffprobe_location": str(self.ffmpeg_mgr.get_ffprobe_path()), "paths": {"home": str(self.download_path)}, @@ -126,6 +127,62 @@ class VideoDownloader: "max_filesize": max_file_size * 1024 * 1024, # Set max file size limit } + def _detailed_progress_hook(self, d): + """Handle detailed download progress tracking""" + try: + if d["status"] == "downloading": + # Get URL from info dict + url = d.get("info_dict", {}).get("webpage_url", "unknown") + + # Update global progress tracking + from videoarchiver.processor import _download_progress + + if url in _download_progress: + _download_progress[url].update({ + 'active': True, + 'percent': float(d.get("_percent_str", "0").replace('%', '')), + 'speed': d.get("_speed_str", "N/A"), + 'eta': d.get("_eta_str", "N/A"), + 'downloaded_bytes': d.get("downloaded_bytes", 0), + 'total_bytes': d.get("total_bytes", 0) or d.get("total_bytes_estimate", 0), + 'retries': d.get("retry_count", 0), + 'fragment_count': d.get("fragment_count", 0), + 'fragment_index': d.get("fragment_index", 0), + 'video_title': d.get("info_dict", {}).get("title", "Unknown"), + 'extractor': d.get("info_dict", {}).get("extractor", "Unknown"), + 'format': d.get("info_dict", {}).get("format", "Unknown"), + 'resolution': d.get("info_dict", {}).get("resolution", "Unknown"), + 'fps': d.get("info_dict", {}).get("fps", "Unknown"), + 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + }) + + logger.debug( + f"Detailed progress for {url}: " + f"{_download_progress[url]['percent']}% at {_download_progress[url]['speed']}, " + f"ETA: {_download_progress[url]['eta']}" + ) + except Exception as e: + logger.error(f"Error in detailed progress hook: {str(e)}") + + def _progress_hook(self, d): + """Handle download progress""" + if d["status"] == "finished": + logger.info(f"Download completed: {d['filename']}") + elif d["status"] == "downloading": + try: + percent = float(d.get("_percent_str", "0").replace("%", "")) + speed = d.get("_speed_str", "N/A") + eta = d.get("_eta_str", "N/A") + downloaded = d.get("downloaded_bytes", 0) + total = d.get("total_bytes", 0) or d.get("total_bytes_estimate", 0) + + logger.debug( + f"Download progress: {percent}% at {speed}, " + f"ETA: {eta}, Downloaded: {downloaded}/{total} bytes" + ) + except Exception as e: + logger.debug(f"Error logging progress: {str(e)}") + def is_supported_url(self, url: str) -> bool: """Check if URL is supported by attempting a simulated download""" if not is_video_url_pattern(url): @@ -173,11 +230,30 @@ class VideoDownloader: return False async def download_video( - self, - url: str, - progress_callback: Optional[Callable[[float], None]] = None + self, url: str, progress_callback: Optional[Callable[[float], None]] = None ) -> Tuple[bool, str, str]: """Download and process a video with improved error handling""" + # Initialize progress tracking for this URL + from videoarchiver.processor import _download_progress + _download_progress[url] = { + 'active': True, + 'start_time': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'), + 'percent': 0, + 'speed': 'N/A', + 'eta': 'N/A', + 'downloaded_bytes': 0, + 'total_bytes': 0, + 'retries': 0, + 'fragment_count': 0, + 'fragment_index': 0, + 'video_title': 'Unknown', + 'extractor': 'Unknown', + 'format': 'Unknown', + 'resolution': 'Unknown', + 'fps': 'Unknown', + 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + } + original_file = None compressed_file = None temp_dir = None @@ -187,7 +263,9 @@ class VideoDownloader: try: with temp_path_context() as temp_dir: # Download the video - success, file_path, error = await self._safe_download(url, temp_dir, progress_callback) + success, file_path, error = await self._safe_download( + url, temp_dir, progress_callback + ) if not success: return False, "", error @@ -216,26 +294,28 @@ class VideoDownloader: compressed_file, compression_params, progress_callback, - use_hardware=True + use_hardware=True, ) # If hardware acceleration fails, fall back to CPU if not success: hardware_accel_failed = True - logger.warning("Hardware acceleration failed, falling back to CPU encoding") + logger.warning( + "Hardware acceleration failed, falling back to CPU encoding" + ) success = await self._try_compression( original_file, compressed_file, compression_params, progress_callback, - use_hardware=False + use_hardware=False, ) if not success: raise CompressionError( "Failed to compress with both hardware and CPU encoding", file_size, - self.max_file_size * 1024 * 1024 + self.max_file_size * 1024 * 1024, ) # Verify compressed file @@ -282,6 +362,8 @@ class VideoDownloader: # Clean up async with self._downloads_lock: self.active_downloads.pop(url, None) + if url in _download_progress: + _download_progress[url]['active'] = False try: if original_file and os.path.exists(original_file): @@ -301,7 +383,7 @@ class VideoDownloader: output_file: str, params: Dict[str, str], progress_callback: Optional[Callable[[float], None]] = None, - use_hardware: bool = True + use_hardware: bool = True, ) -> bool: """Attempt video compression with given parameters""" try: @@ -334,30 +416,82 @@ class VideoDownloader: # Get video duration for progress calculation duration = self._get_video_duration(input_file) + # Update compression progress tracking + from videoarchiver.processor import _compression_progress + + # Get input file size + input_size = os.path.getsize(input_file) + + # Initialize compression progress + _compression_progress[input_file] = { + 'active': True, + 'filename': os.path.basename(input_file), + 'start_time': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'), + 'percent': 0, + 'elapsed_time': '0:00', + 'input_size': input_size, + 'current_size': 0, + 'target_size': self.max_file_size * 1024 * 1024, + 'codec': params.get('c:v', 'unknown'), + 'hardware_accel': use_hardware, + 'preset': params.get('preset', 'unknown'), + 'crf': params.get('crf', 'unknown'), + 'duration': duration, + 'bitrate': params.get('b:v', 'unknown'), + 'audio_codec': params.get('c:a', 'unknown'), + 'audio_bitrate': params.get('b:a', 'unknown'), + 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + } + # Run compression with progress monitoring process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) + start_time = datetime.utcnow() + while True: line = await process.stdout.readline() if not line: break - + try: line = line.decode().strip() if line.startswith("out_time_ms="): - current_time = int(line.split("=")[1]) / 1000000 # Convert microseconds to seconds - if duration > 0 and progress_callback: + current_time = ( + int(line.split("=")[1]) / 1000000 + ) # Convert microseconds to seconds + if duration > 0: progress = min(100, (current_time / duration) * 100) - await progress_callback(progress) + + # Update compression progress + elapsed = datetime.utcnow() - start_time + _compression_progress[input_file].update({ + 'percent': progress, + 'elapsed_time': str(elapsed).split('.')[0], # Remove microseconds + 'current_size': os.path.getsize(output_file) if os.path.exists(output_file) else 0, + 'current_time': current_time, + 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + }) + + if progress_callback: + await progress_callback(progress) except Exception as e: logger.error(f"Error parsing FFmpeg progress: {e}") await process.wait() - return os.path.exists(output_file) + success = os.path.exists(output_file) + + # Update final status + if success and input_file in _compression_progress: + _compression_progress[input_file].update({ + 'active': False, + 'percent': 100, + 'current_size': os.path.getsize(output_file), + 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + }) + + return success except subprocess.CalledProcessError as e: logger.error(f"FFmpeg compression failed: {e.stderr.decode()}") @@ -365,6 +499,10 @@ class VideoDownloader: except Exception as e: logger.error(f"Compression attempt failed: {str(e)}") return False + finally: + # Ensure compression progress is marked as inactive + if input_file in _compression_progress: + _compression_progress[input_file]['active'] = False def _get_video_duration(self, file_path: str) -> float: """Get video duration in seconds""" @@ -372,10 +510,12 @@ class VideoDownloader: ffprobe_path = str(self.ffmpeg_mgr.get_ffprobe_path()) cmd = [ ffprobe_path, - "-v", "quiet", - "-print_format", "json", + "-v", + "quiet", + "-print_format", + "json", "-show_format", - file_path + file_path, ] result = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(result.stdout) @@ -396,25 +536,6 @@ class VideoDownloader: except OSError as e: logger.error(f"Error checking file size: {str(e)}") - def _progress_hook(self, d): - """Handle download progress""" - if d["status"] == "finished": - logger.info(f"Download completed: {d['filename']}") - elif d["status"] == "downloading": - try: - percent = float(d.get("_percent_str", "0").replace('%', '')) - speed = d.get("_speed_str", "N/A") - eta = d.get("_eta_str", "N/A") - downloaded = d.get("downloaded_bytes", 0) - total = d.get("total_bytes", 0) or d.get("total_bytes_estimate", 0) - - logger.debug( - f"Download progress: {percent}% at {speed}, " - f"ETA: {eta}, Downloaded: {downloaded}/{total} bytes" - ) - except Exception as e: - logger.debug(f"Error logging progress: {str(e)}") - def _verify_video_file(self, file_path: str) -> bool: """Verify video file integrity""" try: @@ -466,10 +587,10 @@ class VideoDownloader: return False async def _safe_download( - self, - url: str, + self, + url: str, temp_dir: str, - progress_callback: Optional[Callable[[float], None]] = None + progress_callback: Optional[Callable[[float], None]] = None, ) -> Tuple[bool, str, str]: """Safely download video with retries""" last_error = None @@ -477,18 +598,22 @@ class VideoDownloader: try: ydl_opts = self.ydl_opts.copy() ydl_opts["outtmpl"] = os.path.join(temp_dir, ydl_opts["outtmpl"]) - + # Add progress callback if progress_callback: original_progress_hook = ydl_opts["progress_hooks"][0] + def combined_progress_hook(d): original_progress_hook(d) if d["status"] == "downloading": try: - percent = float(d.get("_percent_str", "0").replace('%', '')) + percent = float( + d.get("_percent_str", "0").replace("%", "") + ) asyncio.create_task(progress_callback(percent)) except Exception as e: logger.error(f"Error in progress callback: {e}") + ydl_opts["progress_hooks"] = [combined_progress_hook] with yt_dlp.YoutubeDL(ydl_opts) as ydl: @@ -513,7 +638,7 @@ class VideoDownloader: logger.error(f"Download attempt {attempt + 1} failed: {str(e)}") if attempt < self.MAX_RETRIES - 1: # Exponential backoff with jitter - delay = self.RETRY_DELAY * (2 ** attempt) + (attempt * 2) + delay = self.RETRY_DELAY * (2**attempt) + (attempt * 2) await asyncio.sleep(delay) else: return False, "", f"All download attempts failed: {last_error}"