Added proper initialization sequence in the queue manager:

Sequential component initialization
Coordination through an _initialized event
Consistent lock hierarchy with _global_lock
Updated the base cog to properly initialize components:
Added timeout handling for queue manager initialization
Ensures queue manager is fully initialized before starting queue processing
Added proper error handling and cleanup
Established consistent lock ordering:
Global Lock -> Queue Lock -> Processing Lock
Prevents circular wait conditions
Each component respects the lock hierarchy
Improved task coordination:
Sequential initialization of components
Proper waiting for initialization before starting operations
Timeout handling for all async operations
This commit is contained in:
pacnpal
2024-11-15 23:04:13 +00:00
parent 9e7e4a851d
commit 53e7769811
2 changed files with 200 additions and 201 deletions

View File

@@ -386,7 +386,20 @@ class VideoArchiver(GroupCog):
max_history_age=86400, max_history_age=86400,
persistence_path=str(queue_path), persistence_path=str(queue_path),
) )
logger.info("Queue manager initialized")
# Initialize queue manager with timeout
try:
await asyncio.wait_for(
self.queue_manager.initialize(),
timeout=INIT_TIMEOUT
)
logger.info("Queue manager initialized successfully")
except asyncio.TimeoutError:
logger.error("Queue manager initialization timed out")
raise ProcessingError("Queue manager initialization timed out")
except Exception as e:
logger.error(f"Queue manager initialization failed: {e}")
raise
# Initialize processor with queue manager and shared FFmpeg manager # Initialize processor with queue manager and shared FFmpeg manager
self.processor = VideoProcessor( self.processor = VideoProcessor(
@@ -429,6 +442,7 @@ class VideoArchiver(GroupCog):
logger.warning("Update checker start timed out") logger.warning("Update checker start timed out")
# Start queue processing as a background task # Start queue processing as a background task
# Only start after queue manager is fully initialized
self._queue_task = asyncio.create_task( self._queue_task = asyncio.create_task(
self.queue_manager.process_queue(self.processor.process_video) self.queue_manager.process_queue(self.processor.process_video)
) )

View File

@@ -48,12 +48,14 @@ class EnhancedVideoQueueManager:
self._channel_queues: Dict[int, Set[str]] = {} self._channel_queues: Dict[int, Set[str]] = {}
self._active_tasks: Set[asyncio.Task] = set() self._active_tasks: Set[asyncio.Task] = set()
# Locks # Locks - Establish consistent ordering
self._queue_lock = asyncio.Lock() self._global_lock = asyncio.Lock() # Primary lock for coordinating all operations
self._processing_lock = asyncio.Lock() self._queue_lock = asyncio.Lock() # Secondary lock for queue operations
self._processing_lock = asyncio.Lock() # Tertiary lock for processing operations
# State # State
self._shutdown = False self._shutdown = False
self._initialized = asyncio.Event()
self.metrics = QueueMetrics() self.metrics = QueueMetrics()
# Components # Components
@@ -68,52 +70,68 @@ class EnhancedVideoQueueManager:
max_history_age=max_history_age max_history_age=max_history_age
) )
# Initialize tasks async def initialize(self) -> None:
self._init_tasks() """Initialize the queue manager components sequentially"""
try:
async with self._global_lock:
logger.info("Starting queue manager initialization...")
# Load persisted state first if available
if self.persistence:
await self._load_persisted_state()
# Start monitoring task
monitor_task = asyncio.create_task(
self.monitor.start_monitoring(
self._queue,
self._processing,
self.metrics,
self._processing_lock
)
)
self._active_tasks.add(monitor_task)
logger.info("Queue monitoring started")
def _init_tasks(self) -> None: # Brief pause to allow monitor to initialize
"""Initialize background tasks""" await asyncio.sleep(0.1)
# Start monitoring
monitor_task = asyncio.create_task( # Start cleanup task
self.monitor.start_monitoring( cleanup_task = asyncio.create_task(
self._queue, self.cleaner.start_cleanup(
self._processing, self._queue,
self.metrics, self._completed,
self._processing_lock self._failed,
) self._guild_queues,
) self._channel_queues,
self._active_tasks.add(monitor_task) self._processing,
logger.info("Queue monitoring started") self.metrics,
self._queue_lock
)
)
self._active_tasks.add(cleanup_task)
logger.info("Queue cleanup started")
# Start cleanup # Signal initialization complete
cleanup_task = asyncio.create_task( self._initialized.set()
self.cleaner.start_cleanup( logger.info("Queue manager initialization completed")
self._queue,
self._completed,
self._failed,
self._guild_queues,
self._channel_queues,
self._processing,
self.metrics,
self._queue_lock
)
)
self._active_tasks.add(cleanup_task)
logger.info("Queue cleanup started")
# Load persisted state if available except Exception as e:
if self.persistence: logger.error(f"Failed to initialize queue manager: {e}")
self._load_persisted_state() self._shutdown = True
raise
def _load_persisted_state(self) -> None: async def _load_persisted_state(self) -> None:
"""Load persisted queue state""" """Load persisted queue state"""
try: try:
state = self.persistence.load_queue_state() state = self.persistence.load_queue_state()
if state: if state:
self._queue = state["queue"] async with self._queue_lock:
self._processing = state["processing"] self._queue = state["queue"]
self._completed = state["completed"] self._completed = state["completed"]
self._failed = state["failed"] self._failed = state["failed"]
async with self._processing_lock:
self._processing = state["processing"]
# Update metrics # Update metrics
metrics_data = state.get("metrics", {}) metrics_data = state.get("metrics", {})
@@ -133,28 +151,32 @@ class EnhancedVideoQueueManager:
self, self,
processor: Callable[[QueueItem], Tuple[bool, Optional[str]]] processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]
) -> None: ) -> None:
"""Process items in the queue """Process items in the queue"""
# Wait for initialization to complete
await self._initialized.wait()
Args:
processor: Function that processes queue items
"""
logger.info("Queue processor started") logger.info("Queue processor started")
last_persist_time = time.time() last_persist_time = time.time()
persist_interval = 60 # Persist state every 60 seconds instead of every operation persist_interval = 60 # Persist state every 60 seconds
while not self._shutdown: while not self._shutdown:
try: try:
# Process items in batches to avoid blocking items = []
async with self._queue_lock: # Use global lock for coordination
# Get up to 5 items from queue async with self._global_lock:
items = [] # Then acquire specific locks in order
while len(items) < 5 and self._queue: async with self._queue_lock:
item = self._queue.pop(0) # Get up to 5 items from queue
self._processing[item.url] = item while len(items) < 5 and self._queue:
items.append(item) item = self._queue.pop(0)
items.append(item)
if items:
async with self._processing_lock:
for item in items:
self._processing[item.url] = item
if not items: if not items:
# Use shorter sleep when queue is empty and yield control
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
continue continue
@@ -164,30 +186,20 @@ class EnhancedVideoQueueManager:
task = asyncio.create_task(self._process_item(processor, item)) task = asyncio.create_task(self._process_item(processor, item))
tasks.append(task) tasks.append(task)
# Wait for all tasks to complete
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)
# Persist state if interval has passed # Persist state if interval has passed
current_time = time.time() current_time = time.time()
if self.persistence and (current_time - last_persist_time) >= persist_interval: if self.persistence and (current_time - last_persist_time) >= persist_interval:
await self.persistence.persist_queue_state( await self._persist_state()
self._queue,
self._processing,
self._completed,
self._failed,
self.metrics
)
last_persist_time = current_time last_persist_time = current_time
except Exception as e: except Exception as e:
logger.error(f"Critical error in queue processor: {e}") logger.error(f"Critical error in queue processor: {e}")
await asyncio.sleep(0.1) # Brief pause on error before retrying await asyncio.sleep(0.1)
# Yield control after each batch
await asyncio.sleep(0) await asyncio.sleep(0)
logger.info("Queue processor stopped")
async def _process_item( async def _process_item(
self, self,
processor: Callable[[QueueItem], Tuple[bool, Optional[str]]], processor: Callable[[QueueItem], Tuple[bool, Optional[str]]],
@@ -195,52 +207,68 @@ class EnhancedVideoQueueManager:
) -> None: ) -> None:
"""Process a single queue item""" """Process a single queue item"""
try: try:
# Process the item
logger.info(f"Processing queue item: {item.url}") logger.info(f"Processing queue item: {item.url}")
item.start_processing() # Start processing tracking item.start_processing()
self.metrics.last_activity_time = time.time() # Update activity time self.metrics.last_activity_time = time.time()
success, error = await processor(item) success, error = await processor(item)
# Update metrics and status async with self._global_lock:
async with self._processing_lock: async with self._processing_lock:
item.finish_processing(success, error) # Update item status item.finish_processing(success, error)
self._processing.pop(item.url, None)
if success:
self._completed[item.url] = item if success:
logger.info(f"Successfully processed: {item.url}") self._completed[item.url] = item
else: logger.info(f"Successfully processed: {item.url}")
if item.retry_count < self.max_retries:
item.retry_count += 1
item.status = "pending"
item.last_retry = datetime.utcnow()
item.priority = max(0, item.priority - 1)
self._queue.append(item)
logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})")
else: else:
self._failed[item.url] = item async with self._queue_lock:
logger.error(f"Failed after {self.max_retries} attempts: {item.url}") if item.retry_count < self.max_retries:
item.retry_count += 1
self._processing.pop(item.url, None) item.status = "pending"
item.last_retry = datetime.utcnow()
# Update metrics item.priority = max(0, item.priority - 1)
self.metrics.update( self._queue.append(item)
processing_time=item.processing_time, logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})")
success=success, else:
error=error self._failed[item.url] = item
) logger.error(f"Failed after {self.max_retries} attempts: {item.url}")
self.metrics.update(
processing_time=item.processing_time,
success=success,
error=error
)
except Exception as e: except Exception as e:
logger.error(f"Error processing {item.url}: {e}") logger.error(f"Error processing {item.url}: {e}")
async with self._processing_lock: async with self._global_lock:
item.finish_processing(False, str(e)) async with self._processing_lock:
self._failed[item.url] = item item.finish_processing(False, str(e))
self._processing.pop(item.url, None) self._processing.pop(item.url, None)
self.metrics.update( self._failed[item.url] = item
processing_time=item.processing_time, self.metrics.update(
success=False, processing_time=item.processing_time,
error=str(e) success=False,
error=str(e)
)
async def _persist_state(self) -> None:
"""Persist current state to storage"""
if not self.persistence:
return
try:
async with self._global_lock:
await self.persistence.persist_queue_state(
self._queue,
self._processing,
self._completed,
self._failed,
self.metrics
) )
except Exception as e:
logger.error(f"Failed to persist state: {e}")
async def add_to_queue( async def add_to_queue(
self, self,
@@ -255,48 +283,42 @@ class EnhancedVideoQueueManager:
if self._shutdown: if self._shutdown:
raise QueueError("Queue manager is shutting down") raise QueueError("Queue manager is shutting down")
await self._initialized.wait()
try: try:
async with self._queue_lock: async with self._global_lock:
if len(self._queue) >= self.max_queue_size: async with self._queue_lock:
raise QueueError("Queue is full") if len(self._queue) >= self.max_queue_size:
raise QueueError("Queue is full")
item = QueueItem( item = QueueItem(
url=url, url=url,
message_id=message_id, message_id=message_id,
channel_id=channel_id, channel_id=channel_id,
guild_id=guild_id, guild_id=guild_id,
author_id=author_id, author_id=author_id,
added_at=datetime.utcnow(), added_at=datetime.utcnow(),
priority=priority, priority=priority,
)
# Add to tracking
if guild_id not in self._guild_queues:
self._guild_queues[guild_id] = set()
self._guild_queues[guild_id].add(url)
if channel_id not in self._channel_queues:
self._channel_queues[channel_id] = set()
self._channel_queues[channel_id].add(url)
# Add to queue with priority
self._queue.append(item)
self._queue.sort(key=lambda x: (-x.priority, x.added_at))
# Update activity time
self.metrics.last_activity_time = time.time()
if self.persistence:
await self.persistence.persist_queue_state(
self._queue,
self._processing,
self._completed,
self._failed,
self.metrics
) )
logger.info(f"Added to queue: {url} (priority: {priority})") if guild_id not in self._guild_queues:
return True self._guild_queues[guild_id] = set()
self._guild_queues[guild_id].add(url)
if channel_id not in self._channel_queues:
self._channel_queues[channel_id] = set()
self._channel_queues[channel_id].add(url)
self._queue.append(item)
self._queue.sort(key=lambda x: (-x.priority, x.added_at))
self.metrics.last_activity_time = time.time()
if self.persistence:
await self._persist_state()
logger.info(f"Added to queue: {url} (priority: {priority})")
return True
except Exception as e: except Exception as e:
logger.error(f"Error adding to queue: {e}") logger.error(f"Error adding to queue: {e}")
@@ -350,38 +372,6 @@ class EnhancedVideoQueueManager:
}, },
} }
async def clear_guild_queue(self, guild_id: int) -> int:
"""Clear all queue items for a guild"""
if self._shutdown:
raise QueueError("Queue manager is shutting down")
try:
cleared = await self.cleaner.clear_guild_queue(
guild_id,
self._queue,
self._processing,
self._completed,
self._failed,
self._guild_queues,
self._channel_queues,
self._queue_lock
)
if self.persistence:
await self.persistence.persist_queue_state(
self._queue,
self._processing,
self._completed,
self._failed,
self.metrics
)
return cleared
except Exception as e:
logger.error(f"Error clearing guild queue: {e}")
raise QueueError(f"Failed to clear guild queue: {str(e)}")
async def cleanup(self) -> None: async def cleanup(self) -> None:
"""Clean up resources and stop queue processing""" """Clean up resources and stop queue processing"""
try: try:
@@ -399,35 +389,30 @@ class EnhancedVideoQueueManager:
await asyncio.gather(*self._active_tasks, return_exceptions=True) await asyncio.gather(*self._active_tasks, return_exceptions=True)
# Move processing items back to queue async with self._global_lock:
async with self._queue_lock: # Move processing items back to queue
for url, item in self._processing.items(): async with self._processing_lock:
if item.retry_count < self.max_retries: for url, item in self._processing.items():
item.status = "pending" if item.retry_count < self.max_retries:
item.retry_count += 1 item.status = "pending"
self._queue.append(item) item.retry_count += 1
else: self._queue.append(item)
self._failed[url] = item else:
self._failed[url] = item
self._processing.clear() self._processing.clear()
# Final state persistence # Final state persistence
if self.persistence: if self.persistence:
await self.persistence.persist_queue_state( await self._persist_state()
self._queue,
self._processing,
self._completed,
self._failed,
self.metrics
)
# Clear collections # Clear collections
self._queue.clear() self._queue.clear()
self._completed.clear() self._completed.clear()
self._failed.clear() self._failed.clear()
self._guild_queues.clear() self._guild_queues.clear()
self._channel_queues.clear() self._channel_queues.clear()
self._active_tasks.clear() self._active_tasks.clear()
logger.info("Queue manager cleanup completed") logger.info("Queue manager cleanup completed")