From 4719f567c66ec15d91fac31393523f107737bb93 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Thu, 14 Nov 2024 22:36:01 +0000 Subject: [PATCH] refactor: Update video_analyzer to use ffprobe directly - Removed dependency on ffmpeg-python package - Added direct ffprobe command execution - Improved error handling and logging - Better organization of analysis methods --- videoarchiver/ffmpeg/video_analyzer.py | 44 ++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/videoarchiver/ffmpeg/video_analyzer.py b/videoarchiver/ffmpeg/video_analyzer.py index 8454a82..bb5b4f1 100644 --- a/videoarchiver/ffmpeg/video_analyzer.py +++ b/videoarchiver/ffmpeg/video_analyzer.py @@ -3,12 +3,12 @@ import os import subprocess import logging -import ffmpeg from pathlib import Path from typing import Dict, Any from contextlib import contextmanager import tempfile import shutil +import json logger = logging.getLogger("VideoArchiver") @@ -32,15 +32,24 @@ class VideoAnalyzer: def analyze_video(self, input_path: str) -> Dict[str, Any]: """Analyze video content for optimal encoding settings""" try: - probe = ffmpeg.probe(input_path) - video_info = next(s for s in probe["streams"] if s["codec_type"] == "video") + # Use ffprobe to get video information + probe_result = self._probe_video(input_path) + if not probe_result: + return {} + + video_info = next( + (s for s in probe_result["streams"] if s["codec_type"] == "video"), + None + ) + if not video_info: + return {} # Get video properties width = int(video_info.get("width", 0)) height = int(video_info.get("height", 0)) fps = eval(video_info.get("r_frame_rate", "30/1")) - duration = float(probe["format"].get("duration", 0)) - bitrate = float(probe["format"].get("bit_rate", 0)) + duration = float(probe_result["format"].get("duration", 0)) + bitrate = float(probe_result["format"].get("bit_rate", 0)) # Advanced analysis has_high_motion = self._detect_high_motion(video_info) @@ -48,7 +57,7 @@ class VideoAnalyzer: # Get audio properties audio_info = next( - (s for s in probe["streams"] if s["codec_type"] == "audio"), + (s for s in probe_result["streams"] if s["codec_type"] == "audio"), None ) audio_props = self._get_audio_properties(audio_info) @@ -69,6 +78,29 @@ class VideoAnalyzer: logger.error(f"Error analyzing video: {str(e)}") return {} + def _probe_video(self, input_path: str) -> Dict: + """Use ffprobe to get video information""" + try: + cmd = [ + str(self.ffmpeg_path).replace('ffmpeg', 'ffprobe'), + "-v", "quiet", + "-print_format", "json", + "-show_format", + "-show_streams", + input_path + ] + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + if result.returncode == 0: + return json.loads(result.stdout) + except Exception as e: + logger.error(f"Error probing video: {str(e)}") + return {} + def _detect_high_motion(self, video_info: Dict) -> bool: """Detect high motion content based on frame rate analysis""" try: