From d61d1508f56f73bd00cbef66cacf1cc91332dce5 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Thu, 14 Nov 2024 20:05:22 +0000 Subject: [PATCH] Improve video processing reliability and resource management FFmpeg Manager improvements: - Add better GPU detection with fallback options - Add video content analysis for optimal encoding - Add advanced encoding parameters and two-pass encoding - Add dynamic audio bitrate scaling - Add better error handling for encoders Utils improvements: - Add temporary directory management - Add better file cleanup with retries and timeouts - Add download retries and video verification - Add thread pool cleanup improvements - Add proper resource cleanup - Add better error handling throughout - Fix potential resource leaks --- videoarchiver/ffmpeg_manager.py | 517 ++++++++++++++++++++++++++++++++ videoarchiver/utils.py | 267 ++++++++++++----- 2 files changed, 707 insertions(+), 77 deletions(-) create mode 100644 videoarchiver/ffmpeg_manager.py diff --git a/videoarchiver/ffmpeg_manager.py b/videoarchiver/ffmpeg_manager.py new file mode 100644 index 0000000..d4b8673 --- /dev/null +++ b/videoarchiver/ffmpeg_manager.py @@ -0,0 +1,517 @@ +import os +import sys +import platform +import subprocess +import logging +import shutil +import requests +import zipfile +import tarfile +from pathlib import Path +import stat +import multiprocessing +import ffmpeg +import tempfile +import json + +logger = logging.getLogger("VideoArchiver") + + +class FFmpegManager: + FFMPEG_URLS = { + "Windows": { + "x86_64": { + "url": "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip", + "bin_name": "ffmpeg.exe", + } + }, + "Linux": { + "x86_64": { + "url": "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linux64-gpl.tar.xz", + "bin_name": "ffmpeg", + }, + "aarch64": { # ARM64 + "url": "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linuxarm64-gpl.tar.xz", + "bin_name": "ffmpeg", + }, + "armv7l": { # ARM32 + "url": "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linuxarm32-gpl.tar.xz", + "bin_name": "ffmpeg", + }, + }, + "Darwin": { # macOS + "x86_64": { + "url": "https://evermeet.cx/ffmpeg/getrelease/zip", + "bin_name": "ffmpeg", + }, + "arm64": { # Apple Silicon + "url": "https://evermeet.cx/ffmpeg/getrelease/zip", + "bin_name": "ffmpeg", + }, + }, + } + + def __init__(self): + self.base_path = Path(__file__).parent / "bin" + self.base_path.mkdir(exist_ok=True) + + # Get system architecture + self.system = platform.system() + self.machine = platform.machine().lower() + if self.machine == "arm64": + self.machine = "aarch64" # Normalize ARM64 naming + + # Try to use system FFmpeg first + system_ffmpeg = shutil.which("ffmpeg") + if system_ffmpeg: + self.ffmpeg_path = Path(system_ffmpeg) + logger.info(f"Using system FFmpeg: {self.ffmpeg_path}") + else: + # Check for existing FFmpeg in our bin directory + try: + arch_config = self.FFMPEG_URLS[self.system][self.machine] + self.ffmpeg_path = self.base_path / arch_config["bin_name"] + if not self.ffmpeg_path.exists(): + # Only download if FFmpeg doesn't exist + self._download_ffmpeg() + if not self._verify_ffmpeg(): + raise Exception("Downloaded FFmpeg binary is not functional") + elif not self._verify_ffmpeg(): + logger.warning( + "Existing FFmpeg binary not functional, downloading new copy" + ) + self._download_ffmpeg() + if not self._verify_ffmpeg(): + raise Exception("Downloaded FFmpeg binary is not functional") + except KeyError: + raise Exception( + f"Unsupported system/architecture: {self.system}/{self.machine}" + ) + + self._gpu_info = self._detect_gpu() + self._cpu_cores = multiprocessing.cpu_count() + + def _verify_ffmpeg(self) -> bool: + """Verify FFmpeg binary works""" + try: + if not self.ffmpeg_path.exists(): + return False + + # Make binary executable on Unix systems + if self.system != "Windows": + try: + self.ffmpeg_path.chmod( + self.ffmpeg_path.stat().st_mode | stat.S_IEXEC + ) + except Exception as e: + logger.error( + f"Failed to set FFmpeg executable permissions: {str(e)}" + ) + return False + + # Test FFmpeg and check for required encoders + result = subprocess.run( + [str(self.ffmpeg_path), "-encoders"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=5, + ) + + if result.returncode != 0: + return False + + # Verify encoders are available + encoders = result.stdout.decode() + required_encoders = ["libx264"] # Base requirement + if self._gpu_info["nvidia"]: + required_encoders.append("h264_nvenc") + if self._gpu_info["amd"]: + required_encoders.append("h264_amf") + if self._gpu_info["intel"]: + required_encoders.append("h264_qsv") + + for encoder in required_encoders: + if encoder not in encoders: + logger.warning(f"Required encoder {encoder} not available") + if encoder != "libx264": # Only warn for GPU encoders + self._gpu_info[encoder.split('_')[1].replace('h264', '')] = False + + return True + except Exception as e: + logger.error(f"FFmpeg verification failed: {str(e)}") + return False + + def _detect_gpu(self) -> dict: + """Detect available GPU and its capabilities""" + gpu_info = {"nvidia": False, "amd": False, "intel": False, "arm": False} + + try: + if self.system == "Linux": + # Check for NVIDIA GPU + try: + nvidia_smi = subprocess.run( + ["nvidia-smi", "-q", "-d", "ENCODER"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=5, + ) + if nvidia_smi.returncode == 0 and b"Encoder" in nvidia_smi.stdout: + gpu_info["nvidia"] = True + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + # Check for AMD GPU + try: + if os.path.exists("/dev/dri/renderD128"): + with open("/sys/class/drm/renderD128/device/vendor", "r") as f: + vendor = f.read().strip() + if vendor == "0x1002": # AMD vendor ID + gpu_info["amd"] = True + except (IOError, OSError): + pass + + # Check for Intel GPU + try: + lspci = subprocess.run( + ["lspci"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=5, + ) + output = lspci.stdout.decode().lower() + if "intel" in output and ("vga" in output or "display" in output): + gpu_info["intel"] = True + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + # Check for ARM GPU + if self.machine in ["aarch64", "armv7l"]: + gpu_info["arm"] = True + + elif self.system == "Windows": + try: + # Use PowerShell to get GPU info + ps_command = "Get-WmiObject Win32_VideoController | ConvertTo-Json" + result = subprocess.run( + ["powershell", "-Command", ps_command], + capture_output=True, + text=True, + timeout=10 + ) + if result.returncode == 0: + gpu_data = json.loads(result.stdout) + if not isinstance(gpu_data, list): + gpu_data = [gpu_data] + + for gpu in gpu_data: + name = gpu.get("Name", "").lower() + if "nvidia" in name: + gpu_info["nvidia"] = True + if "amd" in name or "radeon" in name: + gpu_info["amd"] = True + if "intel" in name: + gpu_info["intel"] = True + except Exception: + # Fallback to dxdiag if PowerShell method fails + with tempfile.NamedTemporaryFile( + suffix=".txt", delete=False + ) as temp_file: + temp_path = temp_file.name + + try: + subprocess.run( + ["dxdiag", "/t", temp_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=10, + ) + if os.path.exists(temp_path): + with open(temp_path, "r", errors="ignore") as f: + content = f.read().lower() + if "nvidia" in content: + gpu_info["nvidia"] = True + if "amd" in content or "radeon" in content: + gpu_info["amd"] = True + if "intel" in content: + gpu_info["intel"] = True + finally: + try: + os.unlink(temp_path) + except OSError: + pass + + except Exception as e: + logger.warning(f"GPU detection failed: {str(e)}") + + return gpu_info + + def _analyze_video(self, input_path: str) -> dict: + """Analyze video content for optimal encoding settings""" + try: + probe = ffmpeg.probe(input_path) + video_info = next(s for s in probe['streams'] if s['codec_type'] == 'video') + + # Get video properties + width = int(video_info.get('width', 0)) + height = int(video_info.get('height', 0)) + fps = eval(video_info.get('r_frame_rate', '30/1')) + duration = float(probe['format'].get('duration', 0)) + bitrate = float(probe['format'].get('bit_rate', 0)) + + # Detect high motion content + has_high_motion = False + if video_info.get('avg_frame_rate'): + avg_fps = eval(video_info['avg_frame_rate']) + if abs(avg_fps - fps) > 5: # Significant frame rate variation + has_high_motion = True + + # Get audio properties + audio_info = next((s for s in probe['streams'] if s['codec_type'] == 'audio'), None) + audio_bitrate = 0 + if audio_info: + audio_bitrate = int(audio_info.get('bit_rate', 0)) + + return { + 'width': width, + 'height': height, + 'fps': fps, + 'duration': duration, + 'bitrate': bitrate, + 'has_high_motion': has_high_motion, + 'audio_bitrate': audio_bitrate + } + except Exception as e: + logger.error(f"Error analyzing video: {str(e)}") + return {} + + def _get_optimal_ffmpeg_params( + self, input_path: str, target_size_bytes: int + ) -> dict: + """Get optimal FFmpeg parameters based on hardware and video size""" + # Analyze video content + video_info = self._analyze_video(input_path) + + # Base parameters + params = { + "c:v": "libx264", # Default to CPU encoding + "threads": str(self._cpu_cores), # Use all CPU cores + "preset": "medium", + "crf": "23", # Default quality + "maxrate": None, + "bufsize": None, + "movflags": "+faststart", # Optimize for web playback + "profile:v": "high", # High profile for better quality + "level": "4.1", # Compatibility level + "pix_fmt": "yuv420p", # Standard pixel format + } + + # Add advanced encoding parameters + params.update({ + "x264opts": "rc-lookahead=60:me=umh:subme=7:ref=4:b-adapt=2:direct=auto", + "tune": "film", # General-purpose tuning + "fastfirstpass": "1", # Fast first pass for two-pass encoding + }) + + # Adjust for high motion content + if video_info.get('has_high_motion'): + params.update({ + "tune": "grain", # Better for high motion + "x264opts": params["x264opts"] + ":deblock=-1,-1:psy-rd=1.0:aq-strength=0.8" + }) + + # GPU-specific optimizations + if self._gpu_info["nvidia"]: + try: + params.update({ + "c:v": "h264_nvenc", + "preset": "p7", # Highest quality NVENC preset + "rc:v": "vbr", # Variable bitrate + "cq:v": "19", # Quality level + "b_ref_mode": "middle", + "spatial-aq": "1", + "temporal-aq": "1", + "rc-lookahead": "32", + "surfaces": "64", + "max_muxing_queue_size": "1024", + }) + except Exception as e: + logger.error(f"NVENC initialization failed: {str(e)}") + self._gpu_info["nvidia"] = False # Disable NVENC + + elif self._gpu_info["amd"]: + try: + params.update({ + "c:v": "h264_amf", + "quality": "quality", + "rc": "vbr_peak", + "enforce_hrd": "1", + "vbaq": "1", + "preanalysis": "1", + "max_muxing_queue_size": "1024", + }) + except Exception as e: + logger.error(f"AMF initialization failed: {str(e)}") + self._gpu_info["amd"] = False + + elif self._gpu_info["intel"]: + try: + params.update({ + "c:v": "h264_qsv", + "preset": "veryslow", + "look_ahead": "1", + "global_quality": "23", + "max_muxing_queue_size": "1024", + }) + except Exception as e: + logger.error(f"QSV initialization failed: {str(e)}") + self._gpu_info["intel"] = False + + try: + # Calculate target bitrate + input_size = os.path.getsize(input_path) + duration = video_info.get('duration', 0) + + if duration > 0 and input_size > target_size_bytes: + # Reserve 5% for container overhead + video_size_target = int(target_size_bytes * 0.95) + + # Calculate audio bitrate (10-20% of total, based on content) + total_bitrate = (video_size_target * 8) / duration + audio_bitrate = min( + 192000, # Max 192kbps + max( + 64000, # Min 64kbps + int(total_bitrate * 0.15) # 15% of total + ) + ) + + # Remaining bitrate for video + video_bitrate = int((video_size_target * 8) / duration - audio_bitrate) + + # Set bitrate constraints + params["maxrate"] = str(int(video_bitrate * 1.5)) # Allow 50% overflow + params["bufsize"] = str(int(video_bitrate * 2)) # Double buffer size + + # Adjust quality based on compression ratio + ratio = input_size / target_size_bytes + if ratio > 4: + params["crf"] = "26" if params["c:v"] == "libx264" else "23" + params["preset"] = "faster" + elif ratio > 2: + params["crf"] = "23" if params["c:v"] == "libx264" else "21" + params["preset"] = "medium" + else: + params["crf"] = "20" if params["c:v"] == "libx264" else "19" + params["preset"] = "slow" + + # Audio settings + params.update({ + "c:a": "aac", + "b:a": f"{int(audio_bitrate/1000)}k", + "ar": "48000", + "ac": "2", # Stereo + }) + + except Exception as e: + logger.error(f"Error calculating bitrates: {str(e)}") + # Use safe default parameters + params.update({ + "crf": "23", + "preset": "medium", + "maxrate": f"{2 * 1024 * 1024}", # 2 Mbps + "bufsize": f"{4 * 1024 * 1024}", # 4 Mbps buffer + "c:a": "aac", + "b:a": "128k", + "ar": "48000", + "ac": "2", + }) + + return params + + def _download_ffmpeg(self): + """Download and extract FFmpeg binary""" + try: + arch_config = self.FFMPEG_URLS[self.system][self.machine] + except KeyError: + raise Exception( + f"Unsupported system/architecture: {self.system}/{self.machine}" + ) + + url = arch_config["url"] + archive_path = ( + self.base_path + / f"ffmpeg_archive{'.zip' if self.system == 'Windows' else '.tar.xz'}" + ) + + try: + # Download archive + response = requests.get(url, stream=True, timeout=30) + response.raise_for_status() + with open(archive_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + # Extract archive + if self.system == "Windows": + with zipfile.ZipFile(archive_path, "r") as zip_ref: + ffmpeg_files = [ + f for f in zip_ref.namelist() if arch_config["bin_name"] in f + ] + if not ffmpeg_files: + raise Exception("FFmpeg binary not found in archive") + zip_ref.extract(ffmpeg_files[0], self.base_path) + if self.ffmpeg_path.exists(): + self.ffmpeg_path.unlink() + os.rename(self.base_path / ffmpeg_files[0], self.ffmpeg_path) + else: + with tarfile.open(archive_path, "r:xz") as tar_ref: + ffmpeg_files = [ + f for f in tar_ref.getnames() if arch_config["bin_name"] in f + ] + if not ffmpeg_files: + raise Exception("FFmpeg binary not found in archive") + tar_ref.extract(ffmpeg_files[0], self.base_path) + if self.ffmpeg_path.exists(): + self.ffmpeg_path.unlink() + os.rename(self.base_path / ffmpeg_files[0], self.ffmpeg_path) + + except Exception as e: + logger.error(f"FFmpeg download/extraction failed: {str(e)}") + raise + finally: + # Cleanup + try: + if archive_path.exists(): + archive_path.unlink() + except Exception as e: + logger.warning(f"Failed to cleanup FFmpeg archive: {str(e)}") + + def force_download(self) -> bool: + """Force re-download of FFmpeg binary""" + try: + # Remove existing binary if it exists + if self.ffmpeg_path.exists(): + try: + self.ffmpeg_path.unlink() + except Exception as e: + logger.error(f"Failed to remove existing FFmpeg: {str(e)}") + return False + + # Download new binary + self._download_ffmpeg() + + # Verify new binary + return self._verify_ffmpeg() + except Exception as e: + logger.error(f"Failed to force download FFmpeg: {str(e)}") + return False + + def get_ffmpeg_path(self) -> str: + """Get path to FFmpeg binary""" + if not self.ffmpeg_path.exists(): + raise Exception("FFmpeg is not available") + return str(self.ffmpeg_path) + + def get_compression_params(self, input_path: str, target_size_mb: int) -> dict: + """Get optimal compression parameters for the given input file""" + return self._get_optimal_ffmpeg_params(input_path, target_size_mb * 1024 * 1024) diff --git a/videoarchiver/utils.py b/videoarchiver/utils.py index f6eaa99..7661214 100644 --- a/videoarchiver/utils.py +++ b/videoarchiver/utils.py @@ -8,6 +8,10 @@ import yt_dlp import ffmpeg from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor +import tempfile +import hashlib +from functools import partial +import contextlib from .ffmpeg_manager import FFmpegManager logging.basicConfig( @@ -18,8 +22,26 @@ logger = logging.getLogger("VideoArchiver") # Initialize FFmpeg manager ffmpeg_mgr = FFmpegManager() +class FileCleanupError(Exception): + """Raised when file cleanup fails""" + pass + +@contextlib.contextmanager +def temp_path_context(): + """Context manager for temporary path creation and cleanup""" + temp_dir = tempfile.mkdtemp(prefix="videoarchiver_") + try: + yield temp_dir + finally: + try: + shutil.rmtree(temp_dir, ignore_errors=True) + except Exception as e: + logger.error(f"Error cleaning up temp directory {temp_dir}: {e}") class VideoDownloader: + MAX_RETRIES = 3 + RETRY_DELAY = 5 # seconds + def __init__( self, download_path: str, @@ -42,27 +64,41 @@ class VideoDownloader: thread_name_prefix="videoarchiver_download" ) + # Track active downloads for cleanup + self.active_downloads: Dict[str, str] = {} + self._downloads_lock = asyncio.Lock() + # Configure yt-dlp options self.ydl_opts = { "format": f"bestvideo[height<={max_quality}]+bestaudio/best[height<={max_quality}]", - "outtmpl": os.path.join(download_path, "%(title)s.%(ext)s"), + "outtmpl": "%(title)s.%(ext)s", # Base filename only, path added later "merge_output_format": video_format, "quiet": True, "no_warnings": True, "extract_flat": False, "concurrent_fragment_downloads": concurrent_downloads, + "retries": 3, + "fragment_retries": 3, + "file_access_retries": 3, + "extractor_retries": 3, "postprocessor_hooks": [self._check_file_size], "progress_hooks": [self._progress_hook], "ffmpeg_location": ffmpeg_mgr.get_ffmpeg_path(), } def __del__(self): - """Ensure thread pool is shutdown""" + """Ensure thread pool is shutdown and files are cleaned up""" try: + # Cancel all active downloads + for file_path in self.active_downloads.values(): + secure_delete_file(file_path) + self.active_downloads.clear() + + # Shutdown thread pool if hasattr(self, 'download_pool'): - self.download_pool.shutdown(wait=False) + self.download_pool.shutdown(wait=True) except Exception as e: - logger.error(f"Error shutting down download pool: {str(e)}") + logger.error(f"Error during VideoDownloader cleanup: {str(e)}") def _get_url_patterns(self) -> List[str]: """Get URL patterns for supported sites""" @@ -91,30 +127,72 @@ class VideoDownloader: if d["status"] == "finished": logger.info(f"Download completed: {d['filename']}") + def _verify_video_file(self, file_path: str) -> bool: + """Verify video file integrity""" + try: + probe = ffmpeg.probe(file_path) + # Check if file has video stream + video_streams = [s for s in probe['streams'] if s['codec_type'] == 'video'] + if not video_streams: + return False + # Check if duration is valid + duration = float(probe['format'].get('duration', 0)) + return duration > 0 + except Exception as e: + logger.error(f"Error verifying video file {file_path}: {e}") + return False + + async def _safe_download(self, url: str, temp_dir: str) -> Tuple[bool, str, str]: + """Safely download video with retries""" + for attempt in range(self.MAX_RETRIES): + try: + ydl_opts = self.ydl_opts.copy() + ydl_opts['outtmpl'] = os.path.join(temp_dir, ydl_opts['outtmpl']) + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = await asyncio.get_event_loop().run_in_executor( + self.download_pool, + lambda: ydl.extract_info(url, download=True) + ) + + if info is None: + raise Exception("Failed to extract video information") + + file_path = os.path.join(temp_dir, ydl.prepare_filename(info)) + if not os.path.exists(file_path): + raise FileNotFoundError("Download completed but file not found") + + if not self._verify_video_file(file_path): + raise Exception("Downloaded file is not a valid video") + + return True, file_path, "" + + except Exception as e: + logger.error(f"Download attempt {attempt + 1} failed: {str(e)}") + if attempt < self.MAX_RETRIES - 1: + await asyncio.sleep(self.RETRY_DELAY) + else: + return False, "", f"All download attempts failed: {str(e)}" + async def download_video(self, url: str) -> Tuple[bool, str, str]: """Download and process a video""" original_file = None compressed_file = None + temp_dir = None try: - # Configure yt-dlp for this download - ydl_opts = self.ydl_opts.copy() + # Create temporary directory for download + with temp_path_context() as temp_dir: + # Download the video + success, file_path, error = await self._safe_download(url, temp_dir) + if not success: + return False, "", error - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - # Run download in executor to prevent blocking - info = await asyncio.get_event_loop().run_in_executor( - self.download_pool, lambda: ydl.extract_info(url, download=True) - ) + original_file = file_path - if info is None: - return False, "", "Failed to extract video information" - - original_file = os.path.join( - self.download_path, ydl.prepare_filename(info) - ) - - if not os.path.exists(original_file): - return False, "", "Download completed but file not found" + # Track this download + async with self._downloads_lock: + self.active_downloads[url] = original_file # Check file size and compress if needed file_size = os.path.getsize(original_file) @@ -125,8 +203,9 @@ class VideoDownloader: params = ffmpeg_mgr.get_compression_params( original_file, self.max_file_size ) - compressed_file = ( - original_file + ".compressed." + self.video_format + compressed_file = os.path.join( + self.download_path, + f"compressed_{os.path.basename(original_file)}" ) # Configure ffmpeg with optimal parameters @@ -135,7 +214,7 @@ class VideoDownloader: # Run compression in executor await asyncio.get_event_loop().run_in_executor( - self.download_pool, # Reuse download pool for compression + self.download_pool, lambda: ffmpeg.run( stream, capture_stdout=True, @@ -147,7 +226,7 @@ class VideoDownloader: if os.path.exists(compressed_file): compressed_size = os.path.getsize(compressed_file) if compressed_size <= (self.max_file_size * 1024 * 1024): - secure_delete_file(original_file) # Remove original + secure_delete_file(original_file) return True, compressed_file, "" else: secure_delete_file(compressed_file) @@ -157,18 +236,29 @@ class VideoDownloader: secure_delete_file(compressed_file) logger.error(f"Compression error: {str(e)}") return False, "", f"Compression error: {str(e)}" - - return True, original_file, "" + else: + # Move file to final location + final_path = os.path.join(self.download_path, os.path.basename(original_file)) + shutil.move(original_file, final_path) + return True, final_path, "" except Exception as e: - # Clean up any leftover files - if original_file and os.path.exists(original_file): - secure_delete_file(original_file) - if compressed_file and os.path.exists(compressed_file): - secure_delete_file(compressed_file) logger.error(f"Download error: {str(e)}") return False, "", str(e) + finally: + # Clean up + async with self._downloads_lock: + self.active_downloads.pop(url, None) + + try: + if original_file and os.path.exists(original_file): + secure_delete_file(original_file) + if compressed_file and os.path.exists(compressed_file) and not compressed_file.startswith(self.download_path): + secure_delete_file(compressed_file) + except Exception as e: + logger.error(f"Error during file cleanup: {str(e)}") + def is_supported_url(self, url: str) -> bool: """Check if URL is supported""" try: @@ -185,6 +275,7 @@ class MessageManager: self.message_duration = message_duration self.message_template = message_template self.scheduled_deletions: Dict[int, asyncio.Task] = {} + self._lock = asyncio.Lock() def format_archive_message( self, author: str, url: str, original_message: str @@ -197,64 +288,78 @@ class MessageManager: if self.message_duration <= 0: return - if message_id in self.scheduled_deletions: - self.scheduled_deletions[message_id].cancel() + async with self._lock: + if message_id in self.scheduled_deletions: + self.scheduled_deletions[message_id].cancel() - async def delete_later(): - await asyncio.sleep( - self.message_duration * 3600 - ) # Convert hours to seconds - try: - await delete_func() - except Exception as e: - logger.error(f"Failed to delete message {message_id}: {str(e)}") - finally: - self.scheduled_deletions.pop(message_id, None) + async def delete_later(): + try: + await asyncio.sleep(self.message_duration * 3600) + await delete_func() + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Failed to delete message {message_id}: {str(e)}") + finally: + async with self._lock: + self.scheduled_deletions.pop(message_id, None) - self.scheduled_deletions[message_id] = asyncio.create_task(delete_later()) + self.scheduled_deletions[message_id] = asyncio.create_task(delete_later()) - def cancel_all_deletions(self): + async def cancel_all_deletions(self): """Cancel all scheduled message deletions""" - for task in self.scheduled_deletions.values(): - task.cancel() - self.scheduled_deletions.clear() + async with self._lock: + for task in self.scheduled_deletions.values(): + task.cancel() + await asyncio.gather(*self.scheduled_deletions.values(), return_exceptions=True) + self.scheduled_deletions.clear() -def secure_delete_file(file_path: str, passes: int = 3) -> bool: +def secure_delete_file(file_path: str, passes: int = 3, timeout: int = 30) -> bool: """Securely delete a file by overwriting it multiple times before removal""" if not os.path.exists(file_path): return True - try: - file_size = os.path.getsize(file_path) - for _ in range(passes): - with open(file_path, "wb") as f: - f.write(os.urandom(file_size)) - f.flush() - os.fsync(f.fileno()) - + start_time = datetime.now() + while True: try: - os.remove(file_path) - except OSError: - pass + file_size = os.path.getsize(file_path) + for _ in range(passes): + with open(file_path, "wb") as f: + f.write(os.urandom(file_size)) + f.flush() + os.fsync(f.fileno()) - try: - if os.path.exists(file_path): - os.unlink(file_path) - except OSError: - pass - - return not os.path.exists(file_path) - - except Exception as e: - logger.error(f"Error during secure delete of {file_path}: {str(e)}") - # Attempt force delete as last resort - try: - if os.path.exists(file_path): + # Try multiple deletion methods + try: os.remove(file_path) - except: - pass - return not os.path.exists(file_path) + except OSError: + try: + os.unlink(file_path) + except OSError: + Path(file_path).unlink(missing_ok=True) + + # Verify file is gone + if os.path.exists(file_path): + # If file still exists, check timeout + if (datetime.now() - start_time).seconds > timeout: + logger.error(f"Timeout while trying to delete {file_path}") + return False + # Wait briefly before retry + asyncio.sleep(0.1) + continue + + return True + + except Exception as e: + logger.error(f"Error during secure delete of {file_path}: {str(e)}") + # Last resort: try force delete + try: + if os.path.exists(file_path): + Path(file_path).unlink(missing_ok=True) + except Exception: + pass + return not os.path.exists(file_path) def cleanup_downloads(download_path: str) -> None: @@ -262,8 +367,16 @@ def cleanup_downloads(download_path: str) -> None: try: if os.path.exists(download_path): # Delete all files in the directory - for file_path in Path(download_path).glob("*"): + for file_path in Path(download_path).glob("**/*"): if file_path.is_file(): secure_delete_file(str(file_path)) + + # Clean up empty subdirectories + for dir_path in Path(download_path).glob("**/*"): + if dir_path.is_dir(): + try: + dir_path.rmdir() # Will only remove if empty + except OSError: + pass # Directory not empty or other error except Exception as e: logger.error(f"Error during cleanup: {str(e)}")