Files
Pac-cogs/videoarchiver/queue/processor.py
2024-11-18 05:17:37 +00:00

352 lines
12 KiB
Python

"""Module for processing queue items"""
import asyncio
import logging
import time
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional, Tuple, List, Set, Dict, Any
from datetime import datetime, timedelta
from models import QueueItem
from state_manager import QueueStateManager, ItemState
from monitoring import QueueMonitor
logger = logging.getLogger("QueueProcessor")
class ProcessingStrategy(Enum):
"""Processing strategies"""
SEQUENTIAL = "sequential" # Process items one at a time
CONCURRENT = "concurrent" # Process multiple items concurrently
BATCHED = "batched" # Process items in batches
PRIORITY = "priority" # Process based on priority
@dataclass
class ProcessingMetrics:
"""Metrics for processing operations"""
total_processed: int = 0
successful: int = 0
failed: int = 0
retried: int = 0
avg_processing_time: float = 0.0
peak_concurrent_tasks: int = 0
last_processed: Optional[datetime] = None
error_counts: Dict[str, int] = None
def __post_init__(self):
self.error_counts = {}
def record_success(self, processing_time: float) -> None:
"""Record successful processing"""
self.total_processed += 1
self.successful += 1
self._update_avg_time(processing_time)
self.last_processed = datetime.utcnow()
def record_failure(self, error: str) -> None:
"""Record processing failure"""
self.total_processed += 1
self.failed += 1
self.error_counts[error] = self.error_counts.get(error, 0) + 1
self.last_processed = datetime.utcnow()
def record_retry(self) -> None:
"""Record processing retry"""
self.retried += 1
def _update_avg_time(self, new_time: float) -> None:
"""Update average processing time"""
if self.total_processed == 1:
self.avg_processing_time = new_time
else:
self.avg_processing_time = (
(self.avg_processing_time * (self.total_processed - 1) + new_time)
/ self.total_processed
)
def get_stats(self) -> Dict[str, Any]:
"""Get processing statistics"""
return {
"total_processed": self.total_processed,
"successful": self.successful,
"failed": self.failed,
"retried": self.retried,
"success_rate": (
self.successful / self.total_processed
if self.total_processed > 0
else 0
),
"avg_processing_time": self.avg_processing_time,
"peak_concurrent_tasks": self.peak_concurrent_tasks,
"last_processed": (
self.last_processed.isoformat()
if self.last_processed
else None
),
"error_distribution": self.error_counts
}
class BatchManager:
"""Manages processing batches"""
def __init__(
self,
batch_size: int,
max_concurrent: int,
timeout: float = 30.0
):
self.batch_size = batch_size
self.max_concurrent = max_concurrent
self.timeout = timeout
self.current_batch: List[QueueItem] = []
self.processing_start: Optional[datetime] = None
async def process_batch(
self,
items: List[QueueItem],
processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]
) -> List[Tuple[QueueItem, bool, Optional[str]]]:
"""Process a batch of items"""
self.current_batch = items
self.processing_start = datetime.utcnow()
tasks = [
asyncio.create_task(self._process_item(processor, item))
for item in items
]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
(item, *self._handle_result(result))
for item, result in zip(items, results)
]
finally:
self.current_batch = []
self.processing_start = None
async def _process_item(
self,
processor: Callable[[QueueItem], Tuple[bool, Optional[str]]],
item: QueueItem
) -> Tuple[bool, Optional[str]]:
"""Process a single item with timeout"""
try:
return await asyncio.wait_for(
processor(item),
timeout=self.timeout
)
except asyncio.TimeoutError:
return False, "Processing timeout"
except Exception as e:
return False, str(e)
def _handle_result(
self,
result: Any
) -> Tuple[bool, Optional[str]]:
"""Handle processing result"""
if isinstance(result, tuple) and len(result) == 2:
return result
if isinstance(result, Exception):
return False, str(result)
return False, "Unknown error"
def get_batch_status(self) -> Dict[str, Any]:
"""Get current batch status"""
return {
"batch_size": len(self.current_batch),
"processing_time": (
(datetime.utcnow() - self.processing_start).total_seconds()
if self.processing_start
else 0
),
"items": [item.url for item in self.current_batch]
}
class QueueProcessor:
"""Handles the processing of queue items"""
def __init__(
self,
state_manager: QueueStateManager,
monitor: QueueMonitor,
strategy: ProcessingStrategy = ProcessingStrategy.CONCURRENT,
max_retries: int = 3,
retry_delay: int = 5,
batch_size: int = 5,
max_concurrent: int = 3
):
self.state_manager = state_manager
self.monitor = monitor
self.strategy = strategy
self.max_retries = max_retries
self.retry_delay = retry_delay
self.batch_manager = BatchManager(batch_size, max_concurrent)
self.metrics = ProcessingMetrics()
self._shutdown = False
self._active_tasks: Set[asyncio.Task] = set()
self._processing_lock = asyncio.Lock()
async def start_processing(
self,
processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]
) -> None:
"""Start processing items in the queue"""
logger.info(f"Queue processor started with strategy: {self.strategy.value}")
while not self._shutdown:
try:
if self.strategy == ProcessingStrategy.BATCHED:
await self._process_batch(processor)
elif self.strategy == ProcessingStrategy.CONCURRENT:
await self._process_concurrent(processor)
else: # SEQUENTIAL or PRIORITY
await self._process_sequential(processor)
except asyncio.CancelledError:
logger.info("Queue processing cancelled")
break
except Exception as e:
logger.error(f"Critical error in queue processor: {e}")
await asyncio.sleep(1) # Delay before retry
await asyncio.sleep(0)
async def _process_batch(
self,
processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]
) -> None:
"""Process items in batches"""
items = await self.state_manager.get_next_items(self.batch_manager.batch_size)
if not items:
await asyncio.sleep(0.1)
return
start_time = time.time()
results = await self.batch_manager.process_batch(items, processor)
for item, success, error in results:
await self._handle_result(
item,
success,
error,
time.time() - start_time
)
async def _process_concurrent(
self,
processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]
) -> None:
"""Process items concurrently"""
if len(self._active_tasks) >= self.batch_manager.max_concurrent:
await asyncio.sleep(0.1)
return
items = await self.state_manager.get_next_items(
self.batch_manager.max_concurrent - len(self._active_tasks)
)
for item in items:
task = asyncio.create_task(self._process_item(processor, item))
self._active_tasks.add(task)
task.add_done_callback(self._active_tasks.discard)
self.metrics.peak_concurrent_tasks = max(
self.metrics.peak_concurrent_tasks,
len(self._active_tasks)
)
async def _process_sequential(
self,
processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]
) -> None:
"""Process items sequentially"""
items = await self.state_manager.get_next_items(1)
if not items:
await asyncio.sleep(0.1)
return
await self._process_item(processor, items[0])
async def _process_item(
self,
processor: Callable[[QueueItem], Tuple[bool, Optional[str]]],
item: QueueItem
) -> None:
"""Process a single queue item"""
try:
logger.info(f"Processing queue item: {item.url}")
start_time = time.time()
async with self._processing_lock:
item.start_processing()
self.monitor.update_activity()
success, error = await processor(item)
processing_time = time.time() - start_time
await self._handle_result(item, success, error, processing_time)
except Exception as e:
logger.error(f"Error processing {item.url}: {e}")
await self._handle_result(item, False, str(e), 0)
async def _handle_result(
self,
item: QueueItem,
success: bool,
error: Optional[str],
processing_time: float
) -> None:
"""Handle processing result"""
item.finish_processing(success, error)
if success:
await self.state_manager.mark_completed(item, True)
self.metrics.record_success(processing_time)
logger.info(f"Successfully processed: {item.url}")
else:
if item.retry_count < self.max_retries:
item.retry_count += 1
await self.state_manager.retry_item(item)
self.metrics.record_retry()
logger.warning(f"Retrying: {item.url} (attempt {item.retry_count})")
await asyncio.sleep(self.retry_delay)
else:
await self.state_manager.mark_completed(item, False, error)
self.metrics.record_failure(error or "Unknown error")
logger.error(f"Failed after {self.max_retries} attempts: {item.url}")
async def stop_processing(self) -> None:
"""Stop processing queue items"""
self._shutdown = True
# Cancel all active tasks
for task in self._active_tasks:
if not task.done():
task.cancel()
# Wait for tasks to complete
if self._active_tasks:
await asyncio.gather(*self._active_tasks, return_exceptions=True)
self._active_tasks.clear()
logger.info("Queue processor stopped")
def is_processing(self) -> bool:
"""Check if the processor is currently processing items"""
return bool(self._active_tasks)
def get_processor_stats(self) -> Dict[str, Any]:
"""Get processor statistics"""
return {
"strategy": self.strategy.value,
"active_tasks": len(self._active_tasks),
"metrics": self.metrics.get_stats(),
"batch_status": self.batch_manager.get_batch_status(),
"is_processing": self.is_processing()
}