From 063258513ec51fa66afe5a745e6cdc78301e306c Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Sun, 17 Nov 2024 20:58:14 +0000 Subject: [PATCH] Creating a shared types module (types.py) for common types and interfaces Updating queue_processor.py to use the shared types Updating manager.py to use the correct imports and shared types The cyclic dependency has been resolved by: Moving shared types to a separate module Having queue_processor.py only import from shared modules Having manager.py use the QueueProcessor from queue/processor.py --- videoarchiver/processor/queue_processor.py | 60 +++++----------------- videoarchiver/queue/manager.py | 10 ++-- videoarchiver/queue/types.py | 44 ++++++++++++++++ 3 files changed, 60 insertions(+), 54 deletions(-) create mode 100644 videoarchiver/queue/types.py diff --git a/videoarchiver/processor/queue_processor.py b/videoarchiver/processor/queue_processor.py index 8693bfb..48d6b92 100644 --- a/videoarchiver/processor/queue_processor.py +++ b/videoarchiver/processor/queue_processor.py @@ -2,52 +2,22 @@ import logging import asyncio -from enum import Enum, auto -from typing import List, Optional, Dict, Any, Set, Union, TypedDict, ClassVar +from typing import List, Optional, Dict, Any, Set, ClassVar from datetime import datetime -import discord # type: ignore +from ..queue.types import QueuePriority, QueueMetrics, ProcessingMetrics from ..queue.models import QueueItem -from ..queue.manager import EnhancedVideoQueueManager -from ..processor.constants import REACTIONS -from ..processor.url_extractor import URLMetadata -from ..utils.exceptions import QueueProcessingError logger = logging.getLogger("VideoArchiver") - -class QueuePriority(Enum): - """Priority levels for queue processing""" - - HIGH = auto() - NORMAL = auto() - LOW = auto() - - -class QueueMetrics(TypedDict): - """Type definition for queue metrics""" - - total_items: int - processing_time: float - success_rate: float - error_rate: float - average_size: float - - class QueueProcessor: """Handles processing of video queue items""" _active_items: ClassVar[Set[int]] = set() _processing_lock: ClassVar[asyncio.Lock] = asyncio.Lock() - def __init__(self, queue_manager: EnhancedVideoQueueManager): - self.queue_manager = queue_manager - self._metrics: Dict[str, Any] = { - "processed_count": 0, - "error_count": 0, - "total_size": 0, - "total_time": 0, - } + def __init__(self): + self._metrics = ProcessingMetrics() async def process_item(self, item: QueueItem) -> bool: """ @@ -85,18 +55,14 @@ class QueueProcessor: def _update_metrics(self, processing_time: float, success: bool, size: int) -> None: """Update processing metrics""" - self._metrics["processed_count"] += 1 - self._metrics["total_time"] += processing_time - - if not success: - self._metrics["error_count"] += 1 - - if size > 0: - self._metrics["total_size"] += size + if success: + self._metrics.record_success(processing_time) + else: + self._metrics.record_failure("Processing error") def get_metrics(self) -> QueueMetrics: """Get current processing metrics""" - total = self._metrics["processed_count"] + total = self._metrics.total_processed if total == 0: return QueueMetrics( total_items=0, @@ -108,8 +74,8 @@ class QueueProcessor: return QueueMetrics( total_items=total, - processing_time=self._metrics["total_time"], - success_rate=(total - self._metrics["error_count"]) / total, - error_rate=self._metrics["error_count"] / total, - average_size=self._metrics["total_size"] / total, + processing_time=self._metrics.avg_processing_time, + success_rate=self._metrics.successful / total, + error_rate=self._metrics.failed / total, + average_size=0, # This would need to be tracked separately if needed ) diff --git a/videoarchiver/queue/manager.py b/videoarchiver/queue/manager.py index 7be07c2..8608b31 100644 --- a/videoarchiver/queue/manager.py +++ b/videoarchiver/queue/manager.py @@ -14,10 +14,10 @@ from .persistence import QueuePersistenceManager from .monitoring import QueueMonitor, MonitoringLevel from .cleanup import QueueCleaner from .models import QueueItem, QueueError, CleanupError +from .types import ProcessingStrategy logger = logging.getLogger("QueueManager") - class QueueState(Enum): """Queue operational states""" UNINITIALIZED = "uninitialized" @@ -28,7 +28,6 @@ class QueueState(Enum): STOPPED = "stopped" ERROR = "error" - class QueueMode(Enum): """Queue processing modes""" NORMAL = "normal" # Standard processing @@ -36,7 +35,6 @@ class QueueMode(Enum): PRIORITY = "priority" # Priority-based processing MAINTENANCE = "maintenance" # Maintenance mode - @dataclass class QueueConfig: """Queue configuration settings""" @@ -52,7 +50,6 @@ class QueueConfig: persistence_enabled: bool = True monitoring_level: MonitoringLevel = MonitoringLevel.NORMAL - @dataclass class QueueStats: """Queue statistics""" @@ -64,7 +61,6 @@ class QueueStats: peak_memory_usage: float = 0.0 state_changes: List[Dict[str, Any]] = field(default_factory=list) - class QueueCoordinator: """Coordinates queue operations""" @@ -100,7 +96,6 @@ class QueueCoordinator: """Wait if queue is paused""" await self._paused.wait() - class EnhancedVideoQueueManager: """Enhanced queue manager with improved organization and maintainability""" @@ -128,10 +123,11 @@ class EnhancedVideoQueueManager: QueuePersistenceManager() if self.config.persistence_enabled else None ) - # Initialize processor + # Initialize processor with strategy self.processor = QueueProcessor( state_manager=self.state_manager, monitor=self.monitor, + strategy=ProcessingStrategy.CONCURRENT, max_retries=self.config.max_retries, retry_delay=self.config.retry_delay, batch_size=self.config.batch_size, diff --git a/videoarchiver/queue/types.py b/videoarchiver/queue/types.py new file mode 100644 index 0000000..607b97f --- /dev/null +++ b/videoarchiver/queue/types.py @@ -0,0 +1,44 @@ +"""Shared types for queue management""" + +from dataclasses import dataclass +from datetime import datetime +from typing import Optional, Dict, Any +from enum import Enum, auto + +class QueuePriority(Enum): + """Priority levels for queue processing""" + HIGH = auto() + NORMAL = auto() + LOW = auto() + +class ProcessingStrategy(Enum): + """Processing strategies""" + SEQUENTIAL = "sequential" # Process items one at a time + CONCURRENT = "concurrent" # Process multiple items concurrently + BATCHED = "batched" # Process items in batches + PRIORITY = "priority" # Process based on priority + +@dataclass +class QueueMetrics: + """Type definition for queue metrics""" + total_items: int + processing_time: float + success_rate: float + error_rate: float + average_size: float + +@dataclass +class ProcessingMetrics: + """Metrics for processing operations""" + total_processed: int = 0 + successful: int = 0 + failed: int = 0 + retried: int = 0 + avg_processing_time: float = 0.0 + peak_concurrent_tasks: int = 0 + last_processed: Optional[datetime] = None + error_counts: Dict[str, int] = None + + def __post_init__(self): + if self.error_counts is None: + self.error_counts = {}