From 45492d7ee483d5ba7c0e4edcbd3bfc1c57927a53 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Fri, 15 Nov 2024 17:59:26 +0000 Subject: [PATCH] fixes --- videoarchiver/enhanced_queue.py | 38 +++-- videoarchiver/processor.py | 19 ++- videoarchiver/utils/video_downloader.py | 193 ++++++++++++++---------- 3 files changed, 148 insertions(+), 102 deletions(-) diff --git a/videoarchiver/enhanced_queue.py b/videoarchiver/enhanced_queue.py index 48aac5e..fcf3e33 100644 --- a/videoarchiver/enhanced_queue.py +++ b/videoarchiver/enhanced_queue.py @@ -718,18 +718,21 @@ class EnhancedVideoQueueManager: async with self._queue_lock: # Clean up completed items for url in list(self._completed.keys()): - item = self._completed[url] try: + item = self._completed[url] # Ensure added_at is a datetime object - if isinstance(item.added_at, str): + if not isinstance(item.added_at, datetime): try: - item.added_at = datetime.fromisoformat(item.added_at) - except ValueError: - # If conversion fails, use current time to ensure item gets cleaned up + if isinstance(item.added_at, str): + item.added_at = datetime.fromisoformat(item.added_at) + else: + # If not string or datetime, set to current time + item.added_at = current_time + except (ValueError, TypeError): + # If conversion fails, use current time item.added_at = current_time - elif not isinstance(item.added_at, datetime): - item.added_at = current_time - + + # Now safe to compare datetimes if item.added_at < cleanup_cutoff: self._completed.pop(url) except Exception as e: @@ -739,18 +742,21 @@ class EnhancedVideoQueueManager: # Clean up failed items for url in list(self._failed.keys()): - item = self._failed[url] try: + item = self._failed[url] # Ensure added_at is a datetime object - if isinstance(item.added_at, str): + if not isinstance(item.added_at, datetime): try: - item.added_at = datetime.fromisoformat(item.added_at) - except ValueError: - # If conversion fails, use current time to ensure item gets cleaned up + if isinstance(item.added_at, str): + item.added_at = datetime.fromisoformat(item.added_at) + else: + # If not string or datetime, set to current time + item.added_at = current_time + except (ValueError, TypeError): + # If conversion fails, use current time item.added_at = current_time - elif not isinstance(item.added_at, datetime): - item.added_at = current_time - + + # Now safe to compare datetimes if item.added_at < cleanup_cutoff: self._failed.pop(url) except Exception as e: diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index 07d1d93..abc1e58 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -275,11 +275,22 @@ class VideoProcessor: def progress_callback(progress: float) -> None: if original_message: try: - # Get event loop for the current context - loop = asyncio.get_running_loop() + # Try to get the current event loop + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # If no event loop is running in this thread, + # we'll use the bot's loop which we know exists + loop = self.bot.loop + + if not loop.is_running(): + logger.warning("Event loop is not running, skipping progress update") + return + # Create a task to update the reaction - loop.create_task( - self.update_download_progress_reaction(original_message, progress) + asyncio.run_coroutine_threadsafe( + self.update_download_progress_reaction(original_message, progress), + loop ) except Exception as e: logger.error(f"Error in progress callback: {e}") diff --git a/videoarchiver/utils/video_downloader.py b/videoarchiver/utils/video_downloader.py index d1eea88..3440122 100644 --- a/videoarchiver/utils/video_downloader.py +++ b/videoarchiver/utils/video_downloader.py @@ -30,6 +30,7 @@ from videoarchiver.utils.path_manager import temp_path_context logger = logging.getLogger("VideoArchiver") + # Add a custom yt-dlp logger to handle cancellation class CancellableYTDLLogger: def __init__(self): @@ -50,6 +51,7 @@ class CancellableYTDLLogger: raise Exception("Download cancelled") logger.error(msg) + def is_video_url_pattern(url: str) -> bool: """Check if URL matches common video platform patterns""" video_patterns = [ @@ -73,6 +75,7 @@ def is_video_url_pattern(url: str) -> bool: ] return any(re.search(pattern, url, re.IGNORECASE) for pattern in video_patterns) + class VideoDownloader: MAX_RETRIES = 5 RETRY_DELAY = 10 @@ -140,9 +143,7 @@ class VideoDownloader: "geo_bypass": True, "socket_timeout": 60, "http_chunk_size": 1048576, - "external_downloader_args": { - "ffmpeg": ["-timeout", "60000000"] - }, + "external_downloader_args": {"ffmpeg": ["-timeout", "60000000"]}, "max_sleep_interval": 5, "sleep_interval": 1, "max_filesize": max_file_size * 1024 * 1024, @@ -151,11 +152,11 @@ class VideoDownloader: async def cleanup(self) -> None: """Clean up resources with proper shutdown""" self._shutting_down = True - + try: # Cancel active downloads self.ytdl_logger.cancelled = True - + # Kill any active FFmpeg processes async with self._processes_lock: for process in self._active_processes: @@ -185,7 +186,7 @@ class VideoDownloader: try: # Force cancel all downloads self.ytdl_logger.cancelled = True - + # Kill all processes immediately async with self._processes_lock: for process in self._active_processes: @@ -211,29 +212,42 @@ class VideoDownloader: if d["status"] == "downloading": # Get URL from info dict url = d.get("info_dict", {}).get("webpage_url", "unknown") - + # Update global progress tracking from videoarchiver.processor import _download_progress - + if url in _download_progress: - _download_progress[url].update({ - 'active': True, - 'percent': float(d.get("_percent_str", "0").replace('%', '')), - 'speed': d.get("_speed_str", "N/A"), - 'eta': d.get("_eta_str", "N/A"), - 'downloaded_bytes': d.get("downloaded_bytes", 0), - 'total_bytes': d.get("total_bytes", 0) or d.get("total_bytes_estimate", 0), - 'retries': d.get("retry_count", 0), - 'fragment_count': d.get("fragment_count", 0), - 'fragment_index': d.get("fragment_index", 0), - 'video_title': d.get("info_dict", {}).get("title", "Unknown"), - 'extractor': d.get("info_dict", {}).get("extractor", "Unknown"), - 'format': d.get("info_dict", {}).get("format", "Unknown"), - 'resolution': d.get("info_dict", {}).get("resolution", "Unknown"), - 'fps': d.get("info_dict", {}).get("fps", "Unknown"), - 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') - }) - + _download_progress[url].update( + { + "active": True, + "percent": float( + d.get("_percent_str", "0").replace("%", "") + ), + "speed": d.get("_speed_str", "N/A"), + "eta": d.get("_eta_str", "N/A"), + "downloaded_bytes": d.get("downloaded_bytes", 0), + "total_bytes": d.get("total_bytes", 0) + or d.get("total_bytes_estimate", 0), + "retries": d.get("retry_count", 0), + "fragment_count": d.get("fragment_count", 0), + "fragment_index": d.get("fragment_index", 0), + "video_title": d.get("info_dict", {}).get( + "title", "Unknown" + ), + "extractor": d.get("info_dict", {}).get( + "extractor", "Unknown" + ), + "format": d.get("info_dict", {}).get("format", "Unknown"), + "resolution": d.get("info_dict", {}).get( + "resolution", "Unknown" + ), + "fps": d.get("info_dict", {}).get("fps", "Unknown"), + "last_update": datetime.utcnow().strftime( + "%Y-%m-%d %H:%M:%S" + ), + } + ) + logger.debug( f"Detailed progress for {url}: " f"{_download_progress[url]['percent']}% at {_download_progress[url]['speed']}, " @@ -316,23 +330,24 @@ class VideoDownloader: # Initialize progress tracking for this URL from videoarchiver.processor import _download_progress + _download_progress[url] = { - 'active': True, - 'start_time': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'), - 'percent': 0, - 'speed': 'N/A', - 'eta': 'N/A', - 'downloaded_bytes': 0, - 'total_bytes': 0, - 'retries': 0, - 'fragment_count': 0, - 'fragment_index': 0, - 'video_title': 'Unknown', - 'extractor': 'Unknown', - 'format': 'Unknown', - 'resolution': 'Unknown', - 'fps': 'Unknown', - 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + "active": True, + "start_time": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), + "percent": 0, + "speed": "N/A", + "eta": "N/A", + "downloaded_bytes": 0, + "total_bytes": 0, + "retries": 0, + "fragment_count": 0, + "fragment_index": 0, + "video_title": "Unknown", + "extractor": "Unknown", + "format": "Unknown", + "resolution": "Unknown", + "fps": "Unknown", + "last_update": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), } original_file = None @@ -354,8 +369,8 @@ class VideoDownloader: async with self._downloads_lock: self.active_downloads[url] = { - 'file_path': original_file, - 'start_time': datetime.utcnow() + "file_path": original_file, + "start_time": datetime.utcnow(), } # Check file size and compress if needed @@ -447,7 +462,7 @@ class VideoDownloader: async with self._downloads_lock: self.active_downloads.pop(url, None) if url in _download_progress: - _download_progress[url]['active'] = False + _download_progress[url]["active"] = False try: if original_file and os.path.exists(original_file): @@ -505,29 +520,29 @@ class VideoDownloader: # Update compression progress tracking from videoarchiver.processor import _compression_progress - + # Get input file size input_size = os.path.getsize(input_file) - + # Initialize compression progress _compression_progress[input_file] = { - 'active': True, - 'filename': os.path.basename(input_file), - 'start_time': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'), - 'percent': 0, - 'elapsed_time': '0:00', - 'input_size': input_size, - 'current_size': 0, - 'target_size': self.max_file_size * 1024 * 1024, - 'codec': params.get('c:v', 'unknown'), - 'hardware_accel': use_hardware, - 'preset': params.get('preset', 'unknown'), - 'crf': params.get('crf', 'unknown'), - 'duration': duration, - 'bitrate': params.get('b:v', 'unknown'), - 'audio_codec': params.get('c:a', 'unknown'), - 'audio_bitrate': params.get('b:a', 'unknown'), - 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + "active": True, + "filename": os.path.basename(input_file), + "start_time": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), + "percent": 0, + "elapsed_time": "0:00", + "input_size": input_size, + "current_size": 0, + "target_size": self.max_file_size * 1024 * 1024, + "codec": params.get("c:v", "unknown"), + "hardware_accel": use_hardware, + "preset": params.get("preset", "unknown"), + "crf": params.get("crf", "unknown"), + "duration": duration, + "bitrate": params.get("b:v", "unknown"), + "audio_codec": params.get("c:a", "unknown"), + "audio_bitrate": params.get("b:a", "unknown"), + "last_update": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), } # Run compression with progress monitoring @@ -560,17 +575,27 @@ class VideoDownloader: ) # Convert microseconds to seconds if duration > 0: progress = min(100, (current_time / duration) * 100) - + # Update compression progress elapsed = datetime.utcnow() - start_time - _compression_progress[input_file].update({ - 'percent': progress, - 'elapsed_time': str(elapsed).split('.')[0], # Remove microseconds - 'current_size': os.path.getsize(output_file) if os.path.exists(output_file) else 0, - 'current_time': current_time, - 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') - }) - + _compression_progress[input_file].update( + { + "percent": progress, + "elapsed_time": str(elapsed).split(".")[ + 0 + ], # Remove microseconds + "current_size": ( + os.path.getsize(output_file) + if os.path.exists(output_file) + else 0 + ), + "current_time": current_time, + "last_update": datetime.utcnow().strftime( + "%Y-%m-%d %H:%M:%S" + ), + } + ) + if progress_callback: # Call the callback directly since it now handles task creation progress_callback(progress) @@ -580,16 +605,20 @@ class VideoDownloader: await process.wait() success = os.path.exists(output_file) - + # Update final status if success and input_file in _compression_progress: - _compression_progress[input_file].update({ - 'active': False, - 'percent': 100, - 'current_size': os.path.getsize(output_file), - 'last_update': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') - }) - + _compression_progress[input_file].update( + { + "active": False, + "percent": 100, + "current_size": os.path.getsize(output_file), + "last_update": datetime.utcnow().strftime( + "%Y-%m-%d %H:%M:%S" + ), + } + ) + return success finally: @@ -606,7 +635,7 @@ class VideoDownloader: finally: # Ensure compression progress is marked as inactive if input_file in _compression_progress: - _compression_progress[input_file]['active'] = False + _compression_progress[input_file]["active"] = False def _get_video_duration(self, file_path: str) -> float: """Get video duration in seconds"""