Improve video processing reliability and resource management

FFmpeg Manager improvements:
- Add better GPU detection with fallback options
- Add video content analysis for optimal encoding
- Add advanced encoding parameters and two-pass encoding
- Add dynamic audio bitrate scaling
- Add better error handling for encoders

Utils improvements:
- Add temporary directory management
- Add better file cleanup with retries and timeouts
- Add download retries and video verification
- Add thread pool cleanup improvements
- Add proper resource cleanup
- Add better error handling throughout
- Fix potential resource leaks
This commit is contained in:
pacnpal
2024-11-14 20:05:22 +00:00
parent 69078025f6
commit d61d1508f5
2 changed files with 707 additions and 77 deletions

View File

@@ -8,6 +8,10 @@ import yt_dlp
import ffmpeg
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
import tempfile
import hashlib
from functools import partial
import contextlib
from .ffmpeg_manager import FFmpegManager
logging.basicConfig(
@@ -18,8 +22,26 @@ logger = logging.getLogger("VideoArchiver")
# Initialize FFmpeg manager
ffmpeg_mgr = FFmpegManager()
class FileCleanupError(Exception):
"""Raised when file cleanup fails"""
pass
@contextlib.contextmanager
def temp_path_context():
"""Context manager for temporary path creation and cleanup"""
temp_dir = tempfile.mkdtemp(prefix="videoarchiver_")
try:
yield temp_dir
finally:
try:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
logger.error(f"Error cleaning up temp directory {temp_dir}: {e}")
class VideoDownloader:
MAX_RETRIES = 3
RETRY_DELAY = 5 # seconds
def __init__(
self,
download_path: str,
@@ -42,27 +64,41 @@ class VideoDownloader:
thread_name_prefix="videoarchiver_download"
)
# Track active downloads for cleanup
self.active_downloads: Dict[str, str] = {}
self._downloads_lock = asyncio.Lock()
# Configure yt-dlp options
self.ydl_opts = {
"format": f"bestvideo[height<={max_quality}]+bestaudio/best[height<={max_quality}]",
"outtmpl": os.path.join(download_path, "%(title)s.%(ext)s"),
"outtmpl": "%(title)s.%(ext)s", # Base filename only, path added later
"merge_output_format": video_format,
"quiet": True,
"no_warnings": True,
"extract_flat": False,
"concurrent_fragment_downloads": concurrent_downloads,
"retries": 3,
"fragment_retries": 3,
"file_access_retries": 3,
"extractor_retries": 3,
"postprocessor_hooks": [self._check_file_size],
"progress_hooks": [self._progress_hook],
"ffmpeg_location": ffmpeg_mgr.get_ffmpeg_path(),
}
def __del__(self):
"""Ensure thread pool is shutdown"""
"""Ensure thread pool is shutdown and files are cleaned up"""
try:
# Cancel all active downloads
for file_path in self.active_downloads.values():
secure_delete_file(file_path)
self.active_downloads.clear()
# Shutdown thread pool
if hasattr(self, 'download_pool'):
self.download_pool.shutdown(wait=False)
self.download_pool.shutdown(wait=True)
except Exception as e:
logger.error(f"Error shutting down download pool: {str(e)}")
logger.error(f"Error during VideoDownloader cleanup: {str(e)}")
def _get_url_patterns(self) -> List[str]:
"""Get URL patterns for supported sites"""
@@ -91,30 +127,72 @@ class VideoDownloader:
if d["status"] == "finished":
logger.info(f"Download completed: {d['filename']}")
def _verify_video_file(self, file_path: str) -> bool:
"""Verify video file integrity"""
try:
probe = ffmpeg.probe(file_path)
# Check if file has video stream
video_streams = [s for s in probe['streams'] if s['codec_type'] == 'video']
if not video_streams:
return False
# Check if duration is valid
duration = float(probe['format'].get('duration', 0))
return duration > 0
except Exception as e:
logger.error(f"Error verifying video file {file_path}: {e}")
return False
async def _safe_download(self, url: str, temp_dir: str) -> Tuple[bool, str, str]:
"""Safely download video with retries"""
for attempt in range(self.MAX_RETRIES):
try:
ydl_opts = self.ydl_opts.copy()
ydl_opts['outtmpl'] = os.path.join(temp_dir, ydl_opts['outtmpl'])
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = await asyncio.get_event_loop().run_in_executor(
self.download_pool,
lambda: ydl.extract_info(url, download=True)
)
if info is None:
raise Exception("Failed to extract video information")
file_path = os.path.join(temp_dir, ydl.prepare_filename(info))
if not os.path.exists(file_path):
raise FileNotFoundError("Download completed but file not found")
if not self._verify_video_file(file_path):
raise Exception("Downloaded file is not a valid video")
return True, file_path, ""
except Exception as e:
logger.error(f"Download attempt {attempt + 1} failed: {str(e)}")
if attempt < self.MAX_RETRIES - 1:
await asyncio.sleep(self.RETRY_DELAY)
else:
return False, "", f"All download attempts failed: {str(e)}"
async def download_video(self, url: str) -> Tuple[bool, str, str]:
"""Download and process a video"""
original_file = None
compressed_file = None
temp_dir = None
try:
# Configure yt-dlp for this download
ydl_opts = self.ydl_opts.copy()
# Create temporary directory for download
with temp_path_context() as temp_dir:
# Download the video
success, file_path, error = await self._safe_download(url, temp_dir)
if not success:
return False, "", error
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Run download in executor to prevent blocking
info = await asyncio.get_event_loop().run_in_executor(
self.download_pool, lambda: ydl.extract_info(url, download=True)
)
original_file = file_path
if info is None:
return False, "", "Failed to extract video information"
original_file = os.path.join(
self.download_path, ydl.prepare_filename(info)
)
if not os.path.exists(original_file):
return False, "", "Download completed but file not found"
# Track this download
async with self._downloads_lock:
self.active_downloads[url] = original_file
# Check file size and compress if needed
file_size = os.path.getsize(original_file)
@@ -125,8 +203,9 @@ class VideoDownloader:
params = ffmpeg_mgr.get_compression_params(
original_file, self.max_file_size
)
compressed_file = (
original_file + ".compressed." + self.video_format
compressed_file = os.path.join(
self.download_path,
f"compressed_{os.path.basename(original_file)}"
)
# Configure ffmpeg with optimal parameters
@@ -135,7 +214,7 @@ class VideoDownloader:
# Run compression in executor
await asyncio.get_event_loop().run_in_executor(
self.download_pool, # Reuse download pool for compression
self.download_pool,
lambda: ffmpeg.run(
stream,
capture_stdout=True,
@@ -147,7 +226,7 @@ class VideoDownloader:
if os.path.exists(compressed_file):
compressed_size = os.path.getsize(compressed_file)
if compressed_size <= (self.max_file_size * 1024 * 1024):
secure_delete_file(original_file) # Remove original
secure_delete_file(original_file)
return True, compressed_file, ""
else:
secure_delete_file(compressed_file)
@@ -157,18 +236,29 @@ class VideoDownloader:
secure_delete_file(compressed_file)
logger.error(f"Compression error: {str(e)}")
return False, "", f"Compression error: {str(e)}"
return True, original_file, ""
else:
# Move file to final location
final_path = os.path.join(self.download_path, os.path.basename(original_file))
shutil.move(original_file, final_path)
return True, final_path, ""
except Exception as e:
# Clean up any leftover files
if original_file and os.path.exists(original_file):
secure_delete_file(original_file)
if compressed_file and os.path.exists(compressed_file):
secure_delete_file(compressed_file)
logger.error(f"Download error: {str(e)}")
return False, "", str(e)
finally:
# Clean up
async with self._downloads_lock:
self.active_downloads.pop(url, None)
try:
if original_file and os.path.exists(original_file):
secure_delete_file(original_file)
if compressed_file and os.path.exists(compressed_file) and not compressed_file.startswith(self.download_path):
secure_delete_file(compressed_file)
except Exception as e:
logger.error(f"Error during file cleanup: {str(e)}")
def is_supported_url(self, url: str) -> bool:
"""Check if URL is supported"""
try:
@@ -185,6 +275,7 @@ class MessageManager:
self.message_duration = message_duration
self.message_template = message_template
self.scheduled_deletions: Dict[int, asyncio.Task] = {}
self._lock = asyncio.Lock()
def format_archive_message(
self, author: str, url: str, original_message: str
@@ -197,64 +288,78 @@ class MessageManager:
if self.message_duration <= 0:
return
if message_id in self.scheduled_deletions:
self.scheduled_deletions[message_id].cancel()
async with self._lock:
if message_id in self.scheduled_deletions:
self.scheduled_deletions[message_id].cancel()
async def delete_later():
await asyncio.sleep(
self.message_duration * 3600
) # Convert hours to seconds
try:
await delete_func()
except Exception as e:
logger.error(f"Failed to delete message {message_id}: {str(e)}")
finally:
self.scheduled_deletions.pop(message_id, None)
async def delete_later():
try:
await asyncio.sleep(self.message_duration * 3600)
await delete_func()
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Failed to delete message {message_id}: {str(e)}")
finally:
async with self._lock:
self.scheduled_deletions.pop(message_id, None)
self.scheduled_deletions[message_id] = asyncio.create_task(delete_later())
self.scheduled_deletions[message_id] = asyncio.create_task(delete_later())
def cancel_all_deletions(self):
async def cancel_all_deletions(self):
"""Cancel all scheduled message deletions"""
for task in self.scheduled_deletions.values():
task.cancel()
self.scheduled_deletions.clear()
async with self._lock:
for task in self.scheduled_deletions.values():
task.cancel()
await asyncio.gather(*self.scheduled_deletions.values(), return_exceptions=True)
self.scheduled_deletions.clear()
def secure_delete_file(file_path: str, passes: int = 3) -> bool:
def secure_delete_file(file_path: str, passes: int = 3, timeout: int = 30) -> bool:
"""Securely delete a file by overwriting it multiple times before removal"""
if not os.path.exists(file_path):
return True
try:
file_size = os.path.getsize(file_path)
for _ in range(passes):
with open(file_path, "wb") as f:
f.write(os.urandom(file_size))
f.flush()
os.fsync(f.fileno())
start_time = datetime.now()
while True:
try:
os.remove(file_path)
except OSError:
pass
file_size = os.path.getsize(file_path)
for _ in range(passes):
with open(file_path, "wb") as f:
f.write(os.urandom(file_size))
f.flush()
os.fsync(f.fileno())
try:
if os.path.exists(file_path):
os.unlink(file_path)
except OSError:
pass
return not os.path.exists(file_path)
except Exception as e:
logger.error(f"Error during secure delete of {file_path}: {str(e)}")
# Attempt force delete as last resort
try:
if os.path.exists(file_path):
# Try multiple deletion methods
try:
os.remove(file_path)
except:
pass
return not os.path.exists(file_path)
except OSError:
try:
os.unlink(file_path)
except OSError:
Path(file_path).unlink(missing_ok=True)
# Verify file is gone
if os.path.exists(file_path):
# If file still exists, check timeout
if (datetime.now() - start_time).seconds > timeout:
logger.error(f"Timeout while trying to delete {file_path}")
return False
# Wait briefly before retry
asyncio.sleep(0.1)
continue
return True
except Exception as e:
logger.error(f"Error during secure delete of {file_path}: {str(e)}")
# Last resort: try force delete
try:
if os.path.exists(file_path):
Path(file_path).unlink(missing_ok=True)
except Exception:
pass
return not os.path.exists(file_path)
def cleanup_downloads(download_path: str) -> None:
@@ -262,8 +367,16 @@ def cleanup_downloads(download_path: str) -> None:
try:
if os.path.exists(download_path):
# Delete all files in the directory
for file_path in Path(download_path).glob("*"):
for file_path in Path(download_path).glob("**/*"):
if file_path.is_file():
secure_delete_file(str(file_path))
# Clean up empty subdirectories
for dir_path in Path(download_path).glob("**/*"):
if dir_path.is_dir():
try:
dir_path.rmdir() # Will only remove if empty
except OSError:
pass # Directory not empty or other error
except Exception as e:
logger.error(f"Error during cleanup: {str(e)}")