commit for rebase

This commit is contained in:
pacnpal
2024-11-14 17:58:40 +00:00
parent 6204e339fc
commit 605a902b2b
7 changed files with 9 additions and 0 deletions

View File

@@ -1,270 +0,0 @@
import os
import sys
import platform
import subprocess
import logging
import shutil
import requests
import zipfile
import tarfile
from pathlib import Path
import stat
import multiprocessing
import ffmpeg
logger = logging.getLogger('VideoArchiver')
class FFmpegManager:
FFMPEG_URLS = {
'Windows': {
'x86_64': {
'url': 'https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip',
'bin_name': 'ffmpeg.exe'
}
},
'Linux': {
'x86_64': {
'url': 'https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linux64-gpl.tar.xz',
'bin_name': 'ffmpeg'
},
'aarch64': { # ARM64
'url': 'https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linuxarm64-gpl.tar.xz',
'bin_name': 'ffmpeg'
},
'armv7l': { # ARM32
'url': 'https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linuxarm32-gpl.tar.xz',
'bin_name': 'ffmpeg'
}
},
'Darwin': { # macOS
'x86_64': {
'url': 'https://evermeet.cx/ffmpeg/getrelease/zip',
'bin_name': 'ffmpeg'
},
'arm64': { # Apple Silicon
'url': 'https://evermeet.cx/ffmpeg/getrelease/zip',
'bin_name': 'ffmpeg'
}
}
}
def __init__(self):
self.base_path = Path(__file__).parent / 'bin'
self.base_path.mkdir(exist_ok=True)
# Get system architecture
self.system = platform.system()
self.machine = platform.machine().lower()
if self.machine == 'arm64':
self.machine = 'aarch64' # Normalize ARM64 naming
# Try to use system FFmpeg first
system_ffmpeg = shutil.which('ffmpeg')
if system_ffmpeg:
self.ffmpeg_path = Path(system_ffmpeg)
logger.info(f"Using system FFmpeg: {self.ffmpeg_path}")
else:
# Fall back to downloaded FFmpeg
try:
arch_config = self.FFMPEG_URLS[self.system][self.machine]
self.ffmpeg_path = self.base_path / arch_config['bin_name']
except KeyError:
raise Exception(f"Unsupported system/architecture: {self.system}/{self.machine}")
self._gpu_info = self._detect_gpu()
self._cpu_cores = multiprocessing.cpu_count()
if not system_ffmpeg:
self._ensure_ffmpeg()
def _detect_gpu(self) -> dict:
"""Detect available GPU and its capabilities"""
gpu_info = {
'nvidia': False,
'amd': False,
'intel': False,
'arm': False
}
try:
if self.system == 'Linux':
# Check for NVIDIA GPU
nvidia_smi = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if nvidia_smi.returncode == 0:
gpu_info['nvidia'] = True
# Check for AMD GPU
if os.path.exists('/dev/dri/renderD128'):
gpu_info['amd'] = True
# Check for Intel GPU
lspci = subprocess.run(['lspci'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if b'VGA' in lspci.stdout and b'Intel' in lspci.stdout:
gpu_info['intel'] = True
# Check for ARM GPU
if self.machine in ['aarch64', 'armv7l']:
gpu_info['arm'] = True
elif self.system == 'Windows':
# Check for any GPU using dxdiag
dxdiag = subprocess.run(['dxdiag', '/t', 'temp_dxdiag.txt'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if os.path.exists('temp_dxdiag.txt'):
with open('temp_dxdiag.txt', 'r') as f:
content = f.read().lower()
if 'nvidia' in content:
gpu_info['nvidia'] = True
if 'amd' in content or 'radeon' in content:
gpu_info['amd'] = True
if 'intel' in content:
gpu_info['intel'] = True
os.remove('temp_dxdiag.txt')
except Exception as e:
logger.warning(f"GPU detection failed: {str(e)}")
return gpu_info
def _get_optimal_ffmpeg_params(self, input_path: str, target_size_bytes: int) -> dict:
"""Get optimal FFmpeg parameters based on hardware and video size"""
params = {
'c:v': 'libx264', # Default to CPU encoding
'threads': str(self._cpu_cores), # Use all CPU cores
'preset': 'medium',
'crf': '23', # Default quality
'maxrate': None,
'bufsize': None,
'movflags': '+faststart', # Optimize for web playback
'profile:v': 'high', # High profile for better quality
'level': '4.1', # Compatibility level
'pix_fmt': 'yuv420p' # Standard pixel format
}
# Check if GPU encoding is possible
if self._gpu_info['nvidia']:
params.update({
'c:v': 'h264_nvenc',
'preset': 'p4', # High quality NVENC preset
'rc:v': 'vbr', # Variable bitrate for better quality
'cq:v': '19', # Quality level for NVENC
'spatial-aq': '1', # Enable spatial adaptive quantization
'temporal-aq': '1', # Enable temporal adaptive quantization
'b_ref_mode': 'middle' # Better quality for B-frames
})
elif self._gpu_info['amd']:
params.update({
'c:v': 'h264_amf',
'quality': 'quality',
'rc': 'vbr_peak',
'enforce_hrd': '1',
'vbaq': '1', # Enable adaptive quantization
'preanalysis': '1'
})
elif self._gpu_info['intel']:
params.update({
'c:v': 'h264_qsv',
'preset': 'veryslow', # Best quality for QSV
'look_ahead': '1',
'global_quality': '23'
})
elif self._gpu_info['arm']:
# Use OpenMAX (OMX) on supported ARM devices
if os.path.exists('/dev/video-codec'):
params.update({
'c:v': 'h264_v4l2m2m', # V4L2 M2M encoder
'extra_hw_frames': '10'
})
else:
# Fall back to optimized CPU encoding for ARM
params.update({
'c:v': 'libx264',
'preset': 'medium',
'tune': 'fastdecode'
})
# Get input file size and probe info
input_size = os.path.getsize(input_path)
probe = ffmpeg.probe(input_path)
duration = float(probe['format']['duration'])
# Only add bitrate constraints if compression is needed
if input_size > target_size_bytes:
# Calculate target bitrate (bits/second)
target_bitrate = int((target_size_bytes * 8) / duration * 0.95) # 95% of target size
params['maxrate'] = f"{target_bitrate}"
params['bufsize'] = f"{target_bitrate * 2}"
# Adjust quality settings based on compression ratio
ratio = input_size / target_size_bytes
if ratio > 4:
params['crf'] = '28' if params['c:v'] == 'libx264' else '23'
params['preset'] = 'faster'
elif ratio > 2:
params['crf'] = '26' if params['c:v'] == 'libx264' else '21'
params['preset'] = 'medium'
else:
params['crf'] = '23' if params['c:v'] == 'libx264' else '19'
params['preset'] = 'slow'
# Audio settings
params.update({
'c:a': 'aac',
'b:a': '192k', # High quality audio
'ar': '48000' # Standard sample rate
})
return params
def _ensure_ffmpeg(self):
"""Ensure FFmpeg is available, downloading if necessary"""
if not self.ffmpeg_path.exists():
self._download_ffmpeg()
# Make binary executable on Unix systems
if self.system != 'Windows':
self.ffmpeg_path.chmod(self.ffmpeg_path.stat().st_mode | stat.S_IEXEC)
def _download_ffmpeg(self):
"""Download and extract FFmpeg binary"""
try:
arch_config = self.FFMPEG_URLS[self.system][self.machine]
except KeyError:
raise Exception(f"Unsupported system/architecture: {self.system}/{self.machine}")
url = arch_config['url']
archive_path = self.base_path / f"ffmpeg_archive{'.zip' if self.system == 'Windows' else '.tar.xz'}"
# Download archive
response = requests.get(url, stream=True)
response.raise_for_status()
with open(archive_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Extract archive
if self.system == 'Windows':
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
ffmpeg_files = [f for f in zip_ref.namelist() if arch_config['bin_name'] in f]
if ffmpeg_files:
zip_ref.extract(ffmpeg_files[0], self.base_path)
os.rename(self.base_path / ffmpeg_files[0], self.ffmpeg_path)
else:
with tarfile.open(archive_path, 'r:xz') as tar_ref:
ffmpeg_files = [f for f in tar_ref.getnames() if arch_config['bin_name'] in f]
if ffmpeg_files:
tar_ref.extract(ffmpeg_files[0], self.base_path)
os.rename(self.base_path / ffmpeg_files[0], self.ffmpeg_path)
# Cleanup
archive_path.unlink()
def get_ffmpeg_path(self) -> str:
"""Get path to FFmpeg binary"""
if not self.ffmpeg_path.exists():
raise Exception("FFmpeg is not available")
return str(self.ffmpeg_path)
def get_compression_params(self, input_path: str, target_size_mb: int) -> dict:
"""Get optimal compression parameters for the given input file"""
return self._get_optimal_ffmpeg_params(input_path, target_size_mb * 1024 * 1024)