From 08d5dc56cf902e25eb87e73512b3e153ab05bd54 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Sat, 16 Nov 2024 17:13:11 +0000 Subject: [PATCH] Fixed role_manager.py typing issues by adding missing imports (Optional, Any) Added process_queue method to EnhancedVideoQueueManager and updated its initialization Updated component_manager.py to use EnhancedVideoQueueManager correctly Fixed circular imports in the core module by: Moving initialization logic to lifecycle.py Making initialization.py provide thin wrappers that delegate to lifecycle.py Ensuring proper import order in base.py Verified all module init.py files are properly exposing their components: core/init.py exposes VideoArchiver queue/init.py exposes EnhancedVideoQueueManager and dependencies processor/init.py exposes VideoProcessor and related components commands/init.py exposes command setup functions The import chain is now clean: base.py imports from lifecycle.py lifecycle.py contains all initialization logic initialization.py delegates to lifecycle.py No circular dependencies All components are properly exposed through their respective init.py files --- videoarchiver/config/role_manager.py | 2 +- videoarchiver/core/component_manager.py | 4 +- videoarchiver/core/initialization.py | 238 +-------------------- videoarchiver/core/lifecycle.py | 43 +++- videoarchiver/processor/queue_processor.py | 29 +-- videoarchiver/queue/manager.py | 134 +++++++----- 6 files changed, 134 insertions(+), 316 deletions(-) diff --git a/videoarchiver/config/role_manager.py b/videoarchiver/config/role_manager.py index 59f74fd..5f3c4e6 100644 --- a/videoarchiver/config/role_manager.py +++ b/videoarchiver/config/role_manager.py @@ -1,7 +1,7 @@ """Module for managing Discord role configurations""" import logging -from typing import Dict, List, Set, Tuple +from typing import Dict, List, Set, Tuple, Optional, Any import discord from .exceptions import ConfigurationError as ConfigError diff --git a/videoarchiver/core/component_manager.py b/videoarchiver/core/component_manager.py index 2794f65..5fe7e78 100644 --- a/videoarchiver/core/component_manager.py +++ b/videoarchiver/core/component_manager.py @@ -188,13 +188,13 @@ class ComponentManager: """Initialize core system components""" from ..config_manager import ConfigManager from ..processor.core import Processor - from ..queue.manager import QueueManager + from ..queue.manager import EnhancedVideoQueueManager from ..ffmpeg.ffmpeg_manager import FFmpegManager core_components = { "config_manager": (ConfigManager(self.cog), set()), "processor": (Processor(self.cog), {"config_manager"}), - "queue_manager": (QueueManager(self.cog), {"config_manager"}), + "queue_manager": (EnhancedVideoQueueManager(), {"config_manager"}), "ffmpeg_mgr": (FFmpegManager(self.cog), set()) } diff --git a/videoarchiver/core/initialization.py b/videoarchiver/core/initialization.py index 397bab8..6377e1c 100644 --- a/videoarchiver/core/initialization.py +++ b/videoarchiver/core/initialization.py @@ -1,238 +1,16 @@ """Module for handling VideoArchiver initialization""" -import logging +from typing import TYPE_CHECKING import asyncio -import traceback -from pathlib import Path -from typing import Dict, Any, Optional -from redbot.core import Config, data_manager -from ..config_manager import ConfigManager -from ..ffmpeg.ffmpeg_manager import FFmpegManager -from ..queue import EnhancedVideoQueueManager -from ..processor import VideoProcessor -from ..update_checker import UpdateChecker -from ..database import VideoArchiveDB -from .guild import initialize_guild_components -from .cleanup import cleanup_resources, force_cleanup_resources -from ..utils.file_ops import cleanup_downloads -from ..utils.exceptions import VideoArchiverError as ProcessingError +if TYPE_CHECKING: + from .base import VideoArchiver -logger = logging.getLogger("VideoArchiver") - -class InitializationTracker: - """Tracks initialization progress""" - - def __init__(self): - self.total_steps = 9 # Updated total number of initialization steps - self.current_step = 0 - self.current_component = "" - self.errors: Dict[str, str] = {} - - def start_step(self, component: str) -> None: - """Start a new initialization step""" - self.current_step += 1 - self.current_component = component - logger.info(f"Initializing {component} ({self.current_step}/{self.total_steps})") - - def record_error(self, component: str, error: str) -> None: - """Record an initialization error""" - self.errors[component] = error - logger.error(f"Error initializing {component}: {error}") - - def get_progress(self) -> Dict[str, Any]: - """Get current initialization progress""" - return { - "progress": (self.current_step / self.total_steps) * 100, - "current_component": self.current_component, - "errors": self.errors.copy() - } - -class ComponentInitializer: - """Handles initialization of individual components""" - - def __init__(self, cog, tracker: InitializationTracker): - self.cog = cog - self.tracker = tracker - - async def init_config(self) -> None: - """Initialize configuration manager""" - self.tracker.start_step("Config Manager") - try: - config = Config.get_conf(self.cog, identifier=855847, force_registration=True) - config.register_guild(**self.cog.default_guild_settings) - self.cog.config_manager = ConfigManager(config) - logger.info("Config manager initialized") - except Exception as e: - self.tracker.record_error("Config Manager", str(e)) - raise - - async def init_paths(self) -> None: - """Initialize data paths""" - self.tracker.start_step("Paths") - try: - self.cog.data_path = Path(data_manager.cog_data_path(self.cog)) - self.cog.download_path = self.cog.data_path / "downloads" - self.cog.download_path.mkdir(parents=True, exist_ok=True) - logger.info("Paths initialized") - except Exception as e: - self.tracker.record_error("Paths", str(e)) - raise - - async def init_database(self) -> None: - """Initialize database""" - self.tracker.start_step("Database") - try: - db_path = self.cog.data_path / "video_archive.db" - self.cog.db = VideoArchiveDB(str(db_path)) - await self.cog.db.initialize() - logger.info("Database initialized") - except Exception as e: - self.tracker.record_error("Database", str(e)) - raise - - async def init_ffmpeg(self) -> None: - """Initialize FFmpeg manager""" - self.tracker.start_step("FFmpeg Manager") - try: - self.cog.ffmpeg_mgr = FFmpegManager() - logger.info("FFmpeg manager initialized") - except Exception as e: - self.tracker.record_error("FFmpeg Manager", str(e)) - raise - - async def init_queue(self) -> None: - """Initialize queue manager""" - self.tracker.start_step("Queue Manager") - try: - queue_path = self.cog.data_path / "queue_state.json" - queue_path.parent.mkdir(parents=True, exist_ok=True) - self.cog.queue_manager = EnhancedVideoQueueManager( - max_retries=3, - retry_delay=5, - max_queue_size=1000, - cleanup_interval=1800, - max_history_age=86400, - persistence_path=str(queue_path), - ) - await self.cog.queue_manager.initialize() - logger.info("Queue manager initialized") - except Exception as e: - self.tracker.record_error("Queue Manager", str(e)) - raise - - async def init_processor(self) -> None: - """Initialize video processor""" - self.tracker.start_step("Video Processor") - try: - self.cog.processor = VideoProcessor( - self.cog.bot, - self.cog.config_manager, - self.cog.components, - queue_manager=self.cog.queue_manager, - ffmpeg_mgr=self.cog.ffmpeg_mgr, - db=self.cog.db, - ) - logger.info("Video processor initialized") - except Exception as e: - self.tracker.record_error("Video Processor", str(e)) - raise - - async def init_guilds(self) -> None: - """Initialize guild components""" - self.tracker.start_step("Guild Components") - errors = [] - for guild in self.cog.bot.guilds: - try: - await initialize_guild_components(self.cog, guild.id) - except Exception as e: - errors.append(f"Guild {guild.id}: {str(e)}") - logger.error(f"Failed to initialize guild {guild.id}: {str(e)}") - if errors: - self.tracker.record_error("Guild Components", "; ".join(errors)) - - async def init_update_checker(self) -> None: - """Initialize update checker""" - self.tracker.start_step("Update Checker") - try: - self.cog.update_checker = UpdateChecker(self.cog.bot, self.cog.config_manager) - await self.cog.update_checker.start() - logger.info("Update checker initialized") - except Exception as e: - self.tracker.record_error("Update Checker", str(e)) - raise - - async def start_queue_processing(self) -> None: - """Start queue processing""" - self.tracker.start_step("Queue Processing") - try: - self.cog._queue_task = asyncio.create_task( - self.cog.queue_manager.process_queue(self.cog.processor.process_video) - ) - logger.info("Queue processing started") - except Exception as e: - self.tracker.record_error("Queue Processing", str(e)) - raise - -class InitializationManager: - """Manages VideoArchiver initialization""" - - def __init__(self, cog): - self.cog = cog - self.tracker = InitializationTracker() - self.component_initializer = ComponentInitializer(cog, self.tracker) - - async def initialize(self) -> None: - """Initialize all components""" - try: - # Initialize components in sequence - await self.component_initializer.init_config() - await self.component_initializer.init_paths() - - # Clean existing downloads - try: - await cleanup_downloads(str(self.cog.download_path)) - except Exception as e: - logger.warning(f"Download cleanup error: {e}") - - await self.component_initializer.init_database() # Added database initialization - await self.component_initializer.init_ffmpeg() - await self.component_initializer.init_queue() - await self.component_initializer.init_processor() - await self.component_initializer.init_guilds() - await self.component_initializer.init_update_checker() - await self.component_initializer.start_queue_processing() - - # Set ready flag - self.cog.ready.set() - logger.info("VideoArchiver initialization completed successfully") - - except Exception as e: - logger.error(f"Error during initialization: {str(e)}") - await cleanup_resources(self.cog) - raise - - def get_progress(self) -> Dict[str, Any]: - """Get initialization progress""" - return self.tracker.get_progress() - -# Global initialization manager instance -init_manager: Optional[InitializationManager] = None - -async def initialize_cog(cog) -> None: +# Re-export initialization functions from lifecycle +async def initialize_cog(cog: "VideoArchiver") -> None: """Initialize all components with proper error handling""" - global init_manager - init_manager = InitializationManager(cog) - await init_manager.initialize() + await cog.lifecycle_manager.initialize_cog() -def init_callback(cog, task: asyncio.Task) -> None: +def init_callback(cog: "VideoArchiver", task: asyncio.Task) -> None: """Handle initialization task completion""" - try: - task.result() - logger.info("Initialization completed successfully") - except asyncio.CancelledError: - logger.warning("Initialization was cancelled") - asyncio.create_task(cleanup_resources(cog)) - except Exception as e: - logger.error(f"Initialization failed: {str(e)}\n{traceback.format_exc()}") - asyncio.create_task(cleanup_resources(cog)) + cog.lifecycle_manager.init_callback(task) diff --git a/videoarchiver/core/lifecycle.py b/videoarchiver/core/lifecycle.py index b4faf6c..0024156 100644 --- a/videoarchiver/core/lifecycle.py +++ b/videoarchiver/core/lifecycle.py @@ -2,13 +2,13 @@ import asyncio import logging -from typing import Optional, Dict, Any, Set +import traceback +from typing import Optional, Dict, Any, Set, List, Callable from enum import Enum from datetime import datetime from .cleanup import cleanup_resources, force_cleanup_resources from ..utils.exceptions import VideoArchiverError -from .initialization import initialize_cog, init_callback logger = logging.getLogger("VideoArchiver") @@ -31,7 +31,7 @@ class TaskManager: self, name: str, coro, - callback=None + callback: Optional[Callable] = None ) -> asyncio.Task: """Create and track a task""" task = asyncio.create_task(coro) @@ -52,7 +52,7 @@ class TaskManager: self, name: str, task: asyncio.Task, - callback=None + callback: Optional[Callable] = None ) -> None: """Handle task completion""" try: @@ -132,12 +132,39 @@ class LifecycleManager: self.cog = cog self.task_manager = TaskManager() self.state_tracker = StateTracker() - self._cleanup_handlers: Set[callable] = set() + self._cleanup_handlers: Set[Callable] = set() - def register_cleanup_handler(self, handler: callable) -> None: + def register_cleanup_handler(self, handler: Callable) -> None: """Register a cleanup handler""" self._cleanup_handlers.add(handler) + async def initialize_cog(self) -> None: + """Initialize all components with proper error handling""" + try: + # Initialize components in sequence + await self.cog.component_manager.initialize_components() + + # Set ready flag + self.cog.ready.set() + logger.info("VideoArchiver initialization completed successfully") + + except Exception as e: + logger.error(f"Error during initialization: {str(e)}") + await cleanup_resources(self.cog) + raise + + def init_callback(self, task: asyncio.Task) -> None: + """Handle initialization task completion""" + try: + task.result() + logger.info("Initialization completed successfully") + except asyncio.CancelledError: + logger.warning("Initialization was cancelled") + asyncio.create_task(cleanup_resources(self.cog)) + except Exception as e: + logger.error(f"Initialization failed: {str(e)}\n{traceback.format_exc()}") + asyncio.create_task(cleanup_resources(self.cog)) + async def handle_load(self) -> None: """Handle cog loading without blocking""" try: @@ -146,8 +173,8 @@ class LifecycleManager: # Start initialization as background task await self.task_manager.create_task( "initialization", - initialize_cog(self.cog), - lambda t: init_callback(self.cog, t) + self.initialize_cog(), + self.init_callback ) logger.info("Initialization started in background") diff --git a/videoarchiver/processor/queue_processor.py b/videoarchiver/processor/queue_processor.py index 4e7ef27..1565441 100644 --- a/videoarchiver/processor/queue_processor.py +++ b/videoarchiver/processor/queue_processor.py @@ -3,11 +3,11 @@ import logging import asyncio from enum import Enum -from dataclasses import dataclass from typing import List, Optional, Dict, Any, Set from datetime import datetime import discord +from ..queue.models import QueueItem from .reactions import REACTIONS logger = logging.getLogger("VideoArchiver") @@ -18,21 +18,6 @@ class QueuePriority(Enum): NORMAL = 1 LOW = 2 -@dataclass -class QueueItem: - """Represents an item in the processing queue""" - url: str - message_id: int - channel_id: int - guild_id: int - author_id: int - priority: QueuePriority - added_at: datetime - metadata: Optional[Dict[str, Any]] = None - attempts: int = 0 - last_attempt: Optional[datetime] = None - error: Optional[str] = None - class ProcessingStrategy(Enum): """Available processing strategies""" FIFO = "fifo" # First in, first out @@ -113,14 +98,14 @@ class QueueProcessor: logger.info(f"Adding URL to queue: {url}") await message.add_reaction(REACTIONS['queued']) - # Create queue item + # Create queue item using the model from queue.models item = QueueItem( url=url, message_id=message.id, channel_id=message.channel.id, guild_id=message.guild.id, author_id=message.author.id, - priority=priority, + priority=priority.value, added_at=datetime.utcnow() ) @@ -163,7 +148,7 @@ class QueueProcessor: channel_id=item.channel_id, guild_id=item.guild_id, author_id=item.author_id, - priority=item.priority.value + priority=item.priority ) async def _add_with_smart_strategy(self, item: QueueItem) -> None: @@ -193,7 +178,7 @@ class QueueProcessor: async def _calculate_smart_priority(self, item: QueueItem) -> int: """Calculate priority using smart strategy""" - base_priority = item.priority.value + base_priority = item.priority # Adjust based on queue metrics stats = self.metrics.get_stats() @@ -206,8 +191,8 @@ class QueueProcessor: base_priority += 1 # Adjust based on retries - if item.attempts > 0: - base_priority += item.attempts + if item.retry_count > 0: + base_priority += item.retry_count # Ensure priority stays in valid range return max(0, min(base_priority, len(QueuePriority) - 1)) diff --git a/videoarchiver/queue/manager.py b/videoarchiver/queue/manager.py index 5707982..7be07c2 100644 --- a/videoarchiver/queue/manager.py +++ b/videoarchiver/queue/manager.py @@ -4,7 +4,7 @@ import asyncio import logging from enum import Enum from dataclasses import dataclass, field -from typing import Optional, Tuple, Dict, Any, List, Set +from typing import Optional, Tuple, Dict, Any, List, Set, Callable from datetime import datetime, timedelta from .state_manager import QueueStateManager @@ -17,6 +17,7 @@ from .models import QueueItem, QueueError, CleanupError logger = logging.getLogger("QueueManager") + class QueueState(Enum): """Queue operational states""" UNINITIALIZED = "uninitialized" @@ -27,13 +28,15 @@ class QueueState(Enum): STOPPED = "stopped" ERROR = "error" + class QueueMode(Enum): """Queue processing modes""" - NORMAL = "normal" # Standard processing - BATCH = "batch" # Batch processing - PRIORITY = "priority" # Priority-based processing + NORMAL = "normal" # Standard processing + BATCH = "batch" # Batch processing + PRIORITY = "priority" # Priority-based processing MAINTENANCE = "maintenance" # Maintenance mode + @dataclass class QueueConfig: """Queue configuration settings""" @@ -43,12 +46,13 @@ class QueueConfig: cleanup_interval: int = 3600 # 1 hour max_history_age: int = 86400 # 24 hours deadlock_threshold: int = 300 # 5 minutes - check_interval: int = 60 # 1 minute + check_interval: int = 60 # 1 minute batch_size: int = 10 max_concurrent: int = 3 persistence_enabled: bool = True monitoring_level: MonitoringLevel = MonitoringLevel.NORMAL + @dataclass class QueueStats: """Queue statistics""" @@ -60,6 +64,7 @@ class QueueStats: peak_memory_usage: float = 0.0 state_changes: List[Dict[str, Any]] = field(default_factory=list) + class QueueCoordinator: """Coordinates queue operations""" @@ -95,6 +100,7 @@ class QueueCoordinator: """Wait if queue is paused""" await self._paused.wait() + class EnhancedVideoQueueManager: """Enhanced queue manager with improved organization and maintainability""" @@ -110,20 +116,18 @@ class EnhancedVideoQueueManager: self.monitor = QueueMonitor( deadlock_threshold=self.config.deadlock_threshold, max_retries=self.config.max_retries, - check_interval=self.config.check_interval + check_interval=self.config.check_interval, ) self.cleaner = QueueCleaner( cleanup_interval=self.config.cleanup_interval, - max_history_age=self.config.max_history_age + max_history_age=self.config.max_history_age, ) - + # Initialize persistence if enabled self.persistence = ( - QueuePersistenceManager() - if self.config.persistence_enabled - else None + QueuePersistenceManager() if self.config.persistence_enabled else None ) - + # Initialize processor self.processor = QueueProcessor( state_manager=self.state_manager, @@ -131,12 +135,13 @@ class EnhancedVideoQueueManager: max_retries=self.config.max_retries, retry_delay=self.config.retry_delay, batch_size=self.config.batch_size, - max_concurrent=self.config.max_concurrent + max_concurrent=self.config.max_concurrent, ) # Background tasks self._maintenance_task: Optional[asyncio.Task] = None self._stats_task: Optional[asyncio.Task] = None + self._processing_task: Optional[asyncio.Task] = None async def initialize(self) -> None: """Initialize the queue manager components""" @@ -147,22 +152,18 @@ class EnhancedVideoQueueManager: try: await self.coordinator.set_state(QueueState.INITIALIZING) logger.info("Starting queue manager initialization...") - + # Load persisted state if available if self.persistence: await self._load_persisted_state() - + # Start monitoring with configured level self.monitor.strategy.level = self.config.monitoring_level - await self.monitor.start( - self.state_manager, - self.metrics_manager - ) - + await self.monitor.start(self.state_manager, self.metrics_manager) + # Start cleanup task await self.cleaner.start( - state_manager=self.state_manager, - metrics_manager=self.metrics_manager + state_manager=self.state_manager, metrics_manager=self.metrics_manager ) # Start background tasks @@ -176,6 +177,25 @@ class EnhancedVideoQueueManager: logger.error(f"Failed to initialize queue manager: {e}") raise + async def process_queue(self, processor_func: Callable[[QueueItem], Tuple[bool, Optional[str]]]) -> None: + """Start processing the queue with the given processor function""" + if self._processing_task and not self._processing_task.done(): + logger.warning("Queue processing is already running") + return + + try: + self._processing_task = asyncio.create_task( + self.processor.start_processing(processor_func) + ) + await self._processing_task + except asyncio.CancelledError: + logger.info("Queue processing cancelled") + except Exception as e: + logger.error(f"Error in queue processing: {e}") + raise + finally: + self._processing_task = None + async def _load_persisted_state(self) -> None: """Load persisted queue state""" try: @@ -189,12 +209,8 @@ class EnhancedVideoQueueManager: def _start_background_tasks(self) -> None: """Start background maintenance tasks""" - self._maintenance_task = asyncio.create_task( - self._maintenance_loop() - ) - self._stats_task = asyncio.create_task( - self._stats_loop() - ) + self._maintenance_task = asyncio.create_task(self._maintenance_loop()) + self._stats_task = asyncio.create_task(self._stats_loop()) async def _maintenance_loop(self) -> None: """Background maintenance loop""" @@ -246,8 +262,7 @@ class EnhancedVideoQueueManager: """Clean up old data""" try: await self.cleaner.cleanup_old_data( - self.state_manager, - self.metrics_manager + self.state_manager, self.metrics_manager ) except Exception as e: logger.error(f"Error cleaning up old data: {e}") @@ -257,7 +272,7 @@ class EnhancedVideoQueueManager: try: # Reorder queue based on priorities await self.state_manager.optimize_queue() - + # Update monitoring level based on queue size queue_size = len(await self.state_manager.get_all_items()) if queue_size > self.config.max_queue_size * 0.8: @@ -272,18 +287,14 @@ class EnhancedVideoQueueManager: """Update queue statistics""" try: self.stats.uptime = datetime.utcnow() - self.stats.start_time - + # Update peak values queue_size = len(await self.state_manager.get_all_items()) - self.stats.peak_queue_size = max( - self.stats.peak_queue_size, - queue_size - ) - + self.stats.peak_queue_size = max(self.stats.peak_queue_size, queue_size) + memory_usage = self.metrics_manager.peak_memory_usage self.stats.peak_memory_usage = max( - self.stats.peak_memory_usage, - memory_usage + self.stats.peak_memory_usage, memory_usage ) except Exception as e: @@ -332,20 +343,23 @@ class EnhancedVideoQueueManager: status = self.state_manager.get_guild_status(guild_id) metrics = self.metrics_manager.get_metrics() monitor_stats = self.monitor.get_monitoring_stats() - + processor_stats = self.processor.get_processor_stats() + return { **status, "metrics": metrics, "monitoring": monitor_stats, "state": self.coordinator.state.value, "mode": self.coordinator.mode.value, + "active": self.coordinator.state == QueueState.RUNNING and bool(processor_stats["active_tasks"]), + "stalled": monitor_stats.get("stalled", False), "stats": { "uptime": self.stats.uptime.total_seconds(), "peak_queue_size": self.stats.peak_queue_size, "peak_memory_usage": self.stats.peak_memory_usage, "total_processed": self.stats.total_processed, - "total_failed": self.stats.total_failed - } + "total_failed": self.stats.total_failed, + }, } except Exception as e: logger.error(f"Error getting queue status: {e}") @@ -366,16 +380,22 @@ class EnhancedVideoQueueManager: try: await self.coordinator.set_state(QueueState.STOPPING) logger.info("Starting queue manager cleanup...") - + # Cancel background tasks if self._maintenance_task: self._maintenance_task.cancel() if self._stats_task: self._stats_task.cancel() - + if self._processing_task: + self._processing_task.cancel() + try: + await self._processing_task + except asyncio.CancelledError: + pass + # Stop processor await self.processor.stop_processing() - + # Stop monitoring and cleanup await self.monitor.stop() await self.cleaner.stop() @@ -399,21 +419,27 @@ class EnhancedVideoQueueManager: """Force stop all queue operations immediately""" await self.coordinator.set_state(QueueState.STOPPING) logger.info("Force stopping queue manager...") - + # Cancel background tasks if self._maintenance_task: self._maintenance_task.cancel() if self._stats_task: self._stats_task.cancel() - + if self._processing_task: + self._processing_task.cancel() + try: + await self._processing_task + except asyncio.CancelledError: + pass + # Force stop all components await self.processor.stop_processing() await self.monitor.stop() await self.cleaner.stop() - + # Clear state await self.state_manager.clear_state() - + await self.coordinator.set_state(QueueState.STOPPED) logger.info("Queue manager force stopped") @@ -421,7 +447,7 @@ class EnhancedVideoQueueManager: """Persist current state to storage""" if not self.persistence: return - + try: state = await self.state_manager.get_state_for_persistence() state["metrics"] = self.metrics_manager.get_metrics() @@ -430,7 +456,7 @@ class EnhancedVideoQueueManager: "peak_queue_size": self.stats.peak_queue_size, "peak_memory_usage": self.stats.peak_memory_usage, "total_processed": self.stats.total_processed, - "total_failed": self.stats.total_failed + "total_failed": self.stats.total_failed, } await self.persistence.persist_queue_state(state) except Exception as e: @@ -443,6 +469,8 @@ class EnhancedVideoQueueManager: "processing": 0, "completed": 0, "failed": 0, + "active": False, + "stalled": False, "metrics": { "total_processed": 0, "total_failed": 0, @@ -462,6 +490,6 @@ class EnhancedVideoQueueManager: "peak_queue_size": 0, "peak_memory_usage": 0, "total_processed": 0, - "total_failed": 0 - } + "total_failed": 0, + }, }