diff --git a/videoarchiver/processor/__init__.py b/videoarchiver/processor/__init__.py index 4e24c1a..c2aa5c5 100644 --- a/videoarchiver/processor/__init__.py +++ b/videoarchiver/processor/__init__.py @@ -40,6 +40,7 @@ from .reactions import ( update_progress_reaction, update_download_progress_reaction ) +from ..utils import progress_tracker # Export public classes and constants __all__ = [ @@ -207,6 +208,3 @@ def clear_caches(message_id: Optional[int] = None) -> None: """ url_extractor.clear_cache(message_id) message_validator.clear_cache(message_id) - -# Initialize shared progress tracker instance -progress_tracker = ProgressTracker() diff --git a/videoarchiver/processor/core.py b/videoarchiver/processor/core.py index 333982c..2de1115 100644 --- a/videoarchiver/processor/core.py +++ b/videoarchiver/processor/core.py @@ -10,7 +10,7 @@ from discord.ext import commands from .message_handler import MessageHandler from .queue_handler import QueueHandler -from ..utils.progress_tracker import ProgressTracker +from ..utils import progress_tracker from .status_display import StatusDisplay from .cleanup_manager import CleanupManager, CleanupStrategy from .constants import REACTIONS @@ -196,7 +196,7 @@ class HealthMonitor: self.health_status.update({ "queue_handler": self.processor.queue_handler.is_healthy(), "message_handler": self.processor.message_handler.is_healthy(), - "progress_tracker": self.processor.progress_tracker.is_healthy() + "progress_tracker": progress_tracker.is_healthy() }) # Check operation health @@ -248,7 +248,6 @@ class VideoProcessor: # Initialize handlers self.queue_handler = QueueHandler(bot, config_manager, components) self.message_handler = MessageHandler(bot, config_manager, queue_manager) - self.progress_tracker = ProgressTracker() self.cleanup_manager = CleanupManager( self.queue_handler, ffmpeg_mgr, diff --git a/videoarchiver/processor/progress_tracker.py b/videoarchiver/processor/progress_tracker.py deleted file mode 100644 index aa03ec0..0000000 --- a/videoarchiver/processor/progress_tracker.py +++ /dev/null @@ -1,88 +0,0 @@ -"""Progress tracking for video downloads and compression""" - -from typing import Dict, Any -from datetime import datetime - -class ProgressTracker: - """Tracks progress of video downloads and compression operations""" - - def __init__(self): - self._download_progress: Dict[str, Dict[str, Any]] = {} - self._compression_progress: Dict[str, Dict[str, Any]] = {} - - def update_download_progress(self, url: str, progress_data: Dict[str, Any]) -> None: - """Update download progress for a specific URL""" - if url not in self._download_progress: - self._download_progress[url] = { - 'active': True, - 'start_time': datetime.utcnow().isoformat(), - 'retries': 0 - } - - self._download_progress[url].update(progress_data) - - def complete_download(self, url: str) -> None: - """Mark a download as complete""" - if url in self._download_progress: - self._download_progress[url]['active'] = False - self._download_progress[url]['completed_time'] = datetime.utcnow().isoformat() - - def increment_download_retries(self, url: str) -> None: - """Increment retry count for a download""" - if url in self._download_progress: - self._download_progress[url]['retries'] = self._download_progress[url].get('retries', 0) + 1 - - def update_compression_progress(self, file_id: str, progress_data: Dict[str, Any]) -> None: - """Update compression progress for a specific file""" - if file_id not in self._compression_progress: - self._compression_progress[file_id] = { - 'active': True, - 'start_time': datetime.utcnow().isoformat() - } - - self._compression_progress[file_id].update(progress_data) - - def complete_compression(self, file_id: str) -> None: - """Mark a compression operation as complete""" - if file_id in self._compression_progress: - self._compression_progress[file_id]['active'] = False - self._compression_progress[file_id]['completed_time'] = datetime.utcnow().isoformat() - - def get_download_progress(self, url: str = None) -> Dict[str, Any]: - """Get download progress for a specific URL or all downloads""" - if url: - return self._download_progress.get(url, {}) - return self._download_progress - - def get_compression_progress(self, file_id: str = None) -> Dict[str, Any]: - """Get compression progress for a specific file or all compressions""" - if file_id: - return self._compression_progress.get(file_id, {}) - return self._compression_progress - - def clear_completed(self) -> None: - """Clear completed operations from tracking""" - # Clear completed downloads - self._download_progress = { - url: data for url, data in self._download_progress.items() - if data.get('active', False) - } - - # Clear completed compressions - self._compression_progress = { - file_id: data for file_id, data in self._compression_progress.items() - if data.get('active', False) - } - - def get_active_operations(self) -> Dict[str, Dict[str, Any]]: - """Get all active operations""" - return { - 'downloads': { - url: data for url, data in self._download_progress.items() - if data.get('active', False) - }, - 'compressions': { - file_id: data for file_id, data in self._compression_progress.items() - if data.get('active', False) - } - } diff --git a/videoarchiver/processor/queue_handler.py b/videoarchiver/processor/queue_handler.py index 9833eb7..cc2cd98 100644 --- a/videoarchiver/processor/queue_handler.py +++ b/videoarchiver/processor/queue_handler.py @@ -8,7 +8,7 @@ from typing import Optional, Dict, Any, List, Tuple, Set, TypedDict, ClassVar, C from datetime import datetime import discord -from ..utils.progress_tracker import ProgressTracker +from ..utils import progress_tracker from ..database.video_archive_db import VideoArchiveDB from ..utils.download_manager import DownloadManager from ..utils.message_manager import MessageManager @@ -57,7 +57,6 @@ class QueueHandler: self._unloading = False self._active_downloads: Dict[str, asyncio.Task] = {} self._active_downloads_lock = asyncio.Lock() - self.progress_tracker = ProgressTracker() self._stats: QueueStats = { "active_downloads": 0, "processing_items": 0, @@ -71,13 +70,13 @@ class QueueHandler: async def process_video(self, item: QueueItem) -> Tuple[bool, Optional[str]]: """ Process a video from the queue. - + Args: item: Queue item to process - + Returns: Tuple of (success, error_message) - + Raises: QueueHandlerError: If there's an error during processing """ @@ -105,12 +104,16 @@ class QueueHandler: message_manager = components.get("message_manager") if not downloader or not message_manager: - raise QueueHandlerError(f"Missing required components for guild {item.guild_id}") + raise QueueHandlerError( + f"Missing required components for guild {item.guild_id}" + ) # Get original message and update reactions original_message = await self._get_original_message(item) if original_message: - await self._update_message_reactions(original_message, QueueItemStatus.PROCESSING) + await self._update_message_reactions( + original_message, QueueItemStatus.PROCESSING + ) # Download and archive video file_path = await self._process_video_file( @@ -121,7 +124,9 @@ class QueueHandler: self._update_stats(True, start_time) item.finish_processing(True) if original_message: - await self._update_message_reactions(original_message, QueueItemStatus.COMPLETED) + await self._update_message_reactions( + original_message, QueueItemStatus.COMPLETED + ) return True, None except QueueHandlerError as e: @@ -143,7 +148,9 @@ class QueueHandler: if self.db.is_url_archived(item.url): logger.info(f"Video already archived: {item.url}") if original_message := await self._get_original_message(item): - await self._update_message_reactions(original_message, QueueItemStatus.COMPLETED) + await self._update_message_reactions( + original_message, QueueItemStatus.COMPLETED + ) archived_info = self.db.get_archived_video(item.url) if archived_info: await original_message.reply( @@ -153,10 +160,7 @@ class QueueHandler: return True return False - async def _get_components( - self, - guild_id: int - ) -> Dict[str, Any]: + async def _get_components(self, guild_id: int) -> Dict[str, Any]: """Get required components for processing""" if guild_id not in self.components: raise QueueHandlerError(f"No components found for guild {guild_id}") @@ -167,7 +171,7 @@ class QueueHandler: downloader: DownloadManager, message_manager: MessageManager, item: QueueItem, - original_message: Optional[discord.Message] + original_message: Optional[discord.Message], ) -> Optional[str]: """Download and process video file""" # Create progress callback @@ -182,11 +186,7 @@ class QueueHandler: # Archive video success, error = await self._archive_video( - item.guild_id, - original_message, - message_manager, - item.url, - file_path + item.guild_id, original_message, message_manager, item.url, file_path ) if not success: raise QueueHandlerError(f"Failed to archive video: {error}") @@ -194,16 +194,15 @@ class QueueHandler: return file_path def _handle_processing_error( - self, - item: QueueItem, - message: Optional[discord.Message], - error: str + self, item: QueueItem, message: Optional[discord.Message], error: str ) -> None: """Handle processing error""" self._update_stats(False, datetime.utcnow()) item.finish_processing(False, error) if message: - asyncio.create_task(self._update_message_reactions(message, QueueItemStatus.FAILED)) + asyncio.create_task( + self._update_message_reactions(message, QueueItemStatus.FAILED) + ) def _update_stats(self, success: bool, start_time: datetime) -> None: """Update queue statistics""" @@ -213,19 +212,19 @@ class QueueHandler: self._stats["completed_items"] += 1 else: self._stats["failed_items"] += 1 - + # Update average processing time total_items = self._stats["completed_items"] + self._stats["failed_items"] if total_items > 0: current_total = self._stats["average_processing_time"] * (total_items - 1) - self._stats["average_processing_time"] = (current_total + processing_time) / total_items - + self._stats["average_processing_time"] = ( + current_total + processing_time + ) / total_items + self._stats["last_processed"] = datetime.utcnow().isoformat() async def _update_message_reactions( - self, - message: discord.Message, - status: QueueItemStatus + self, message: discord.Message, status: QueueItemStatus ) -> None: """Update message reactions based on status""" try: @@ -234,7 +233,7 @@ class QueueHandler: REACTIONS["queued"], REACTIONS["processing"], REACTIONS["success"], - REACTIONS["error"] + REACTIONS["error"], ]: try: await message.remove_reaction(reaction, self.bot.user) @@ -265,21 +264,21 @@ class QueueHandler: original_message: Optional[discord.Message], message_manager: MessageManager, url: str, - file_path: str + file_path: str, ) -> Tuple[bool, Optional[str]]: """ Archive downloaded video. - + Args: guild_id: Discord guild ID original_message: Original message containing the video message_manager: Message manager instance url: Video URL file_path: Path to downloaded video file - + Returns: Tuple of (success, error_message) - + Raises: QueueHandlerError: If archiving fails """ @@ -308,19 +307,14 @@ class QueueHandler: raise QueueHandlerError("Processed file not found") archive_message = await archive_channel.send( - content=message, - file=discord.File(file_path) + content=message, file=discord.File(file_path) ) # Store in database if available if self.db and archive_message.attachments: discord_url = archive_message.attachments[0].url self.db.add_archived_video( - url, - discord_url, - archive_message.id, - archive_channel.id, - guild_id + url, discord_url, archive_message.id, archive_channel.id, guild_id ) logger.info(f"Added video to archive database: {url} -> {discord_url}") @@ -333,16 +327,13 @@ class QueueHandler: logger.error(f"Failed to archive video: {str(e)}") raise QueueHandlerError(f"Failed to archive video: {str(e)}") - async def _get_original_message( - self, - item: QueueItem - ) -> Optional[discord.Message]: + async def _get_original_message(self, item: QueueItem) -> Optional[discord.Message]: """ Retrieve the original message. - + Args: item: Queue item containing message details - + Returns: Original Discord message or None if not found """ @@ -358,57 +349,61 @@ class QueueHandler: return None def _create_progress_callback( - self, - message: Optional[discord.Message], - url: str + self, message: Optional[discord.Message], url: str ) -> Callable[[float], None]: """ Create progress callback function for download tracking. - + Args: message: Discord message to update with progress url: URL being downloaded - + Returns: Callback function for progress updates """ + def progress_callback(progress: float) -> None: if message: try: loop = asyncio.get_running_loop() if not loop.is_running(): - logger.warning("Event loop is not running, skipping progress update") + logger.warning( + "Event loop is not running, skipping progress update" + ) return # Update progress tracking - self.progress_tracker.update_download_progress(url, { - 'percent': progress, - 'last_update': datetime.utcnow().isoformat() - }) + progress_tracker.update_download_progress( + url, + { + "percent": progress, + "last_update": datetime.utcnow().isoformat(), + }, + ) # Create task to update reaction asyncio.run_coroutine_threadsafe( - self._update_download_progress_reaction(message, progress), - loop + self._update_download_progress_reaction(message, progress), loop ) except Exception as e: logger.error(f"Error in progress callback: {e}") + return progress_callback async def _download_video( self, downloader: DownloadManager, url: str, - progress_callback: Callable[[float], None] + progress_callback: Callable[[float], None], ) -> Tuple[bool, Optional[str], Optional[str]]: """ Download video with progress tracking. - + Args: downloader: Download manager instance url: URL to download progress_callback: Callback for progress updates - + Returns: Tuple of (success, file_path, error_message) """ @@ -422,13 +417,12 @@ class QueueHandler: try: success, file_path, error = await asyncio.wait_for( - download_task, - timeout=self.DOWNLOAD_TIMEOUT + download_task, timeout=self.DOWNLOAD_TIMEOUT ) if success: - self.progress_tracker.complete_download(url) + progress_tracker.complete_download(url) else: - self.progress_tracker.increment_download_retries(url) + progress_tracker.increment_download_retries(url) return success, file_path, error except asyncio.TimeoutError: @@ -448,7 +442,7 @@ class QueueHandler: async def cleanup(self) -> None: """ Clean up resources and stop processing. - + Raises: QueueHandlerError: If cleanup fails """ @@ -466,7 +460,9 @@ class QueueHandler: except asyncio.CancelledError: pass except Exception as e: - logger.error(f"Error cancelling download task for {url}: {e}") + logger.error( + f"Error cancelling download task for {url}: {e}" + ) self._active_downloads.clear() self._stats["active_downloads"] = 0 @@ -492,12 +488,12 @@ class QueueHandler: logger.info("QueueHandler force cleanup completed") except Exception as e: - logger.error(f"Error during QueueHandler force cleanup: {str(e)}", exc_info=True) + logger.error( + f"Error during QueueHandler force cleanup: {str(e)}", exc_info=True + ) async def _update_download_progress_reaction( - self, - message: discord.Message, - progress: float + self, message: discord.Message, progress: float ) -> None: """Update download progress reaction on message""" if not message: @@ -535,7 +531,7 @@ class QueueHandler: def is_healthy(self) -> bool: """ Check if handler is healthy. - + Returns: True if handler is healthy, False otherwise """ @@ -543,9 +539,13 @@ class QueueHandler: # Check if any downloads are stuck current_time = datetime.utcnow() for url, task in self._active_downloads.items(): - if not task.done() and task.get_coro().cr_frame.f_locals.get('start_time'): - start_time = task.get_coro().cr_frame.f_locals['start_time'] - if (current_time - start_time).total_seconds() > self.DOWNLOAD_TIMEOUT: + if not task.done() and task.get_coro().cr_frame.f_locals.get( + "start_time" + ): + start_time = task.get_coro().cr_frame.f_locals["start_time"] + if ( + current_time - start_time + ).total_seconds() > self.DOWNLOAD_TIMEOUT: self._stats["is_healthy"] = False return False @@ -566,7 +566,7 @@ class QueueHandler: def get_stats(self) -> QueueStats: """ Get queue handler statistics. - + Returns: Dictionary containing queue statistics """ diff --git a/videoarchiver/utils/download_manager.py b/videoarchiver/utils/download_manager.py index 3572a09..48f1a20 100644 --- a/videoarchiver/utils/download_manager.py +++ b/videoarchiver/utils/download_manager.py @@ -11,7 +11,7 @@ from pathlib import Path from .verification_manager import VideoVerificationManager from .compression_manager import CompressionManager -from .progress_tracker import ProgressTracker +from . import progress_tracker logger = logging.getLogger("DownloadManager") @@ -62,7 +62,6 @@ class DownloadManager: # Initialize components self.verification_manager = VideoVerificationManager(ffmpeg_mgr) self.compression_manager = CompressionManager(ffmpeg_mgr, max_file_size) - self.progress_tracker = ProgressTracker() # Create thread pool self.download_pool = ThreadPoolExecutor( @@ -135,7 +134,7 @@ class DownloadManager: logger.info(f"Download completed: {d['filename']}") elif d["status"] == "downloading": try: - self.progress_tracker.update_download_progress(d) + progress_tracker.update_download_progress(d) except Exception as e: logger.debug(f"Error logging progress: {str(e)}") @@ -145,7 +144,7 @@ class DownloadManager: self.ytdl_logger.cancelled = True self.download_pool.shutdown(wait=False, cancel_futures=True) await self.compression_manager.cleanup() - self.progress_tracker.clear_progress() + progress_tracker.clear_progress() async def force_cleanup(self) -> None: """Force cleanup of all resources""" @@ -153,7 +152,7 @@ class DownloadManager: self.ytdl_logger.cancelled = True self.download_pool.shutdown(wait=False, cancel_futures=True) await self.compression_manager.force_cleanup() - self.progress_tracker.clear_progress() + progress_tracker.clear_progress() async def download_video( self, @@ -164,7 +163,7 @@ class DownloadManager: if self._shutting_down: return False, "", "Downloader is shutting down" - self.progress_tracker.start_download(url) + progress_tracker.start_download(url) try: # Download video @@ -186,7 +185,7 @@ class DownloadManager: return False, "", str(e) finally: - self.progress_tracker.end_download(url) + progress_tracker.end_download(url) async def _safe_download( self, diff --git a/videoarchiver/utils/progress_tracker.py b/videoarchiver/utils/progress_tracker.py index d725cb1..f046f84 100644 --- a/videoarchiver/utils/progress_tracker.py +++ b/videoarchiver/utils/progress_tracker.py @@ -3,11 +3,20 @@ import logging from typing import Dict, Any, Optional from datetime import datetime +from enum import Enum, auto logger = logging.getLogger(__name__) +class ProgressStatus(Enum): + """Status of a progress operation""" + PENDING = auto() + ACTIVE = auto() + COMPLETED = auto() + FAILED = auto() + CANCELLED = auto() + class ProgressTracker: - """Progress tracker singleton.""" + """Progress tracker for downloads and compressions.""" _instance = None def __new__(cls): @@ -18,296 +27,71 @@ class ProgressTracker: def __init__(self): if not hasattr(self, '_initialized'): - self._data: Dict[str, Dict[str, Any]] = {} + self._download_progress: Dict[str, Dict[str, Any]] = {} + self._compression_progress: Dict[str, Dict[str, Any]] = {} self._initialized = True - def update(self, key: str, data: Dict[str, Any]) -> None: - """Update progress for a key.""" - if key not in self._data: - self._data[key] = { + def update_download_progress(self, url: str, data: Dict[str, Any]) -> None: + """Update progress for a download.""" + if url not in self._download_progress: + self._download_progress[url] = { + 'active': True, + 'start_time': datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), + 'percent': 0, + 'retries': 0 + } + self._download_progress[url].update(data) + self._download_progress[url]['last_update'] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + logger.debug(f"Download progress for {url}: {self._download_progress[url].get('percent', 0)}%") + + def increment_download_retries(self, url: str) -> None: + """Increment retry count for a download.""" + if url in self._download_progress: + self._download_progress[url]['retries'] = self._download_progress[url].get('retries', 0) + 1 + logger.debug(f"Incremented retries for {url} to {self._download_progress[url]['retries']}") + + def get_download_progress(self, url: Optional[str] = None) -> Dict[str, Any]: + """Get progress for a download.""" + if url is None: + return self._download_progress + return self._download_progress.get(url, {}) + + def update_compression_progress(self, file_path: str, data: Dict[str, Any]) -> None: + """Update progress for a compression.""" + if file_path not in self._compression_progress: + self._compression_progress[file_path] = { 'active': True, 'start_time': datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), 'percent': 0 } - self._data[key].update(data) - self._data[key]['last_update'] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") - logger.debug(f"Progress for {key}: {self._data[key].get('percent', 0)}%") + self._compression_progress[file_path].update(data) + self._compression_progress[file_path]['last_update'] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + logger.debug(f"Compression progress for {file_path}: {self._compression_progress[file_path].get('percent', 0)}%") - def get(self, key: Optional[str] = None) -> Dict[str, Any]: - """Get progress for a key.""" - if key is None: - return self._data - return self._data.get(key, {}) - - def complete(self, key: str) -> None: - """Mark progress as complete.""" - if key in self._data: - self._data[key]['active'] = False - logger.info(f"Operation completed for {key}") - - def clear(self) -> None: - """Clear all progress data.""" - self._data.clear() - logger.info("Progress data cleared") - -_tracker = ProgressTracker() - - def get_compression(self, file_path: Optional[str] = None) -> Dict[str, Any]: - """Get compression progress.""" + def get_compression_progress(self, file_path: Optional[str] = None) -> Dict[str, Any]: + """Get progress for a compression.""" if file_path is None: - return self._compressions - return self._compressions.get(file_path, {}) + return self._compression_progress + return self._compression_progress.get(file_path, {}) def complete_download(self, url: str) -> None: """Mark download as complete.""" - if url in self._downloads: - self._downloads[url]['active'] = False + if url in self._download_progress: + self._download_progress[url]['active'] = False logger.info(f"Download completed for {url}") def complete_compression(self, file_path: str) -> None: """Mark compression as complete.""" - if file_path in self._compressions: - self._compressions[file_path]['active'] = False - logger.info(f"Compression completed for {file_path}") - - def clear(self) -> None: - """Clear all progress data.""" - self._downloads.clear() - self._compressions.clear() - logger.info("Progress data cleared") - -# Global instance -_tracker = ProgressTrack - -# Global instance -_tracker = ProgressTracker() - -def get_tracker() -> Progre - """Clear all progress tracking""" - self._download_progress.clear() - self._compression_progress.clear() - logger.info("Cleared all progress tracking data") - -# Create singleton instance -progress_tracker = ProgressTracker() - -def get_progress_tracker() -> ProgressTracker: - - def mark_compression_complete(self, file_path: str) -> None: - """Mark a compression operation as complete""" if file_path in self._compression_progress: self._compression_progress[file_path]['active'] = False logger.info(f"Compression completed for {file_path}") - def clear_progress(self) -> None: - """Clear all progress tracking""" + def clear(self) -> None: + """Clear all progress data.""" self._download_progress.clear() self._compression_progress.clear() - logger.info("Cleared all progress tracking data") + logger.info("Progress data cleared") -# Create singleton instance -progress_tracker = ProgressTracker() - -# Export the singleton instance -def get_progress_tracker() -> ProgressTracker: - - - Args: - data: Dictionary containing download progress data - """ - try: - info_dict = data.get("info_dict", {}) - url = info_dict.get("webpage_url") - if not url or url not in self._download_progress: - return - - if data.get("status") == "downloading": - percent_str = data.get("_percent_str", "0").replace("%", "") - try: - percent = float(percent_str) - except ValueError: - percent = 0.0 - - total_bytes = ( - data.get("total_bytes", 0) or - data.get("total_bytes_estimate", 0) - ) - - self._download_progress[url].update({ - "active": True, - "percent": percent, - "speed": data.get("_speed_str", "N/A"), - "eta": data.get("_eta_str", "N/A"), - "downloaded_bytes": data.get("downloaded_bytes", 0), - "total_bytes": total_bytes, - "retries": data.get("retry_count", 0), - "fragment_count": data.get("fragment_count", 0), - "fragment_index": data.get("fragment_index", 0), - "video_title": info_dict.get("title", "Unknown"), - "extractor": info_dict.get("extractor", "Unknown"), - "format": info_dict.get("format", "Unknown"), - "resolution": info_dict.get("resolution", "Unknown"), - "fps": info_dict.get("fps", "Unknown"), - "last_update": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") - }) - - logger.debug( - f"Download progress for {url}: " - f"{percent:.1f}% at {self._download_progress[url]['speed']}, " - f"ETA: {self._download_progress[url]['eta']}" - ) - - except Exception as e: - logger.error(f"Error updating download progress: {e}", exc_info=True) - - def end_download(self, url: str, status: ProgressStatus = ProgressStatus.COMPLETED) -> None: - """ - Mark a download as completed. - - Args: - url: The URL being downloaded - status: The final status of the download - """ - if url in self._download_progress: - self._download_progress[url]["active"] = False - logger.info(f"Download {status.value} for {url}") - - def start_compression(self, params: CompressionParams) -> None: - """ - Initialize progress tracking for compression. - - Args: - params: Compression parameters - """ - current_time = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") - self._compression_progress[params.input_file] = CompressionProgress( - active=True, - filename=params.input_file, - start_time=current_time, - percent=0.0, - elapsed_time="0:00", - input_size=params.input_size, - current_size=0, - target_size=params.target_size, - codec=params.codec_params.get("c:v", "unknown"), - hardware_accel=params.use_hardware, - preset=params.codec_params.get("preset", "unknown"), - crf=params.codec_params.get("crf", "unknown"), - duration=params.duration, - bitrate=params.codec_params.get("b:v", "unknown"), - audio_codec=params.codec_params.get("c:a", "unknown"), - audio_bitrate=params.codec_params.get("b:a", "unknown"), - last_update=current_time, - current_time=None - ) - - def update_compression_progress( - self, - input_file: str, - progress: float, - elapsed_time: str, - current_size: int, - current_time: float - ) -> None: - """ - Update compression progress information. - - Args: - input_file: The input file being compressed - progress: Current progress percentage (0-100) - elapsed_time: Time elapsed as string - current_size: Current file size in bytes - current_time: Current timestamp in seconds - """ - if input_file in self._compression_progress: - self._compression_progress[input_file].update({ - "percent": max(0.0, min(100.0, progress)), - "elapsed_time": elapsed_time, - "current_size": current_size, - "current_time": current_time, - "last_update": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") - }) - - logger.debug( - f"Compression progress for {input_file}: " - f"{progress:.1f}%, Size: {current_size}/{self._compression_progress[input_file]['target_size']} bytes" - ) - - def end_compression( - self, - input_file: str, - status: ProgressStatus = ProgressStatus.COMPLETED - ) -> None: - """ - Mark a compression operation as completed. - - Args: - input_file: The input file being compressed - status: The final status of the compression - """ - if input_file in self._compression_progress: - self._compression_progress[input_file]["active"] = False - logger.info(f"Compression {status.value} for {input_file}") - - def get_download_progress(self, url: Optional[str] = None) -> Optional[DownloadProgress]: - """ - Get progress information for a download. - - Args: - url: Optional URL to get progress for. If None, returns all progress. - - Returns: - Progress information for the specified download or None if not found - """ - if url is None: - return self._download_progress - return self._download_progress.get(url) - - def get_compression_progress( - self, - input_file: Optional[str] = None - ) -> Optional[CompressionProgress]: - """ - Get progress information for a compression operation. - - Args: - input_file: Optional file to get progress for. If None, returns all progress. - - Returns: - Progress information for the specified compression or None if not found - """ - if input_file is None: - return self._compression_progress - return self._compression_progress.get(input_file) - - def get_active_downloads(self) -> Dict[str, DownloadProgress]: - """ - Get all active downloads. - - Returns: - Dictionary of active downloads and their progress - """ - return { - url: progress - for url, progress in self._download_progress.items() - if progress.get("active", False) - } - - def get_active_compressions(self) -> Dict[str, CompressionProgress]: - """ - Get all active compression operations. - - Returns: - Dictionary of active compressions and their progress - """ - return { - input_file: progress - for input_file, progress in self._compression_progress.items() - if progress.get("active", False) - } - - def clear_progress(self) -> None: - """Clear all progress tracking""" - self._download_progress.clear() - self._compression_progress.clear() - logger.info("Cleared \ No newline at end of file + def is_healthy(self) -> bool: + """Check if tracker is healthy.""" + return True # Basic health check, can be expanded if needed