mirror of
https://github.com/pacnpal/Pac-cogs.git
synced 2025-12-20 10:51:05 -05:00
fix imports
This commit is contained in:
@@ -34,6 +34,50 @@ logging.basicConfig(
|
|||||||
)
|
)
|
||||||
logger = logging.getLogger('EnhancedQueueManager')
|
logger = logging.getLogger('EnhancedQueueManager')
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class QueueMetrics:
|
||||||
|
"""Metrics tracking for queue performance and health"""
|
||||||
|
total_processed: int = 0
|
||||||
|
total_failed: int = 0
|
||||||
|
avg_processing_time: float = 0.0
|
||||||
|
success_rate: float = 0.0
|
||||||
|
peak_memory_usage: float = 0.0
|
||||||
|
errors_by_type: Dict[str, int] = field(default_factory=dict)
|
||||||
|
last_error: Optional[str] = None
|
||||||
|
last_error_time: Optional[datetime] = None
|
||||||
|
last_cleanup: datetime = field(default_factory=datetime.utcnow)
|
||||||
|
retries: int = 0
|
||||||
|
processing_times: List[float] = field(default_factory=list)
|
||||||
|
|
||||||
|
def update(self, processing_time: float, success: bool, error: str = None):
|
||||||
|
"""Update metrics with new processing information"""
|
||||||
|
self.total_processed += 1
|
||||||
|
if not success:
|
||||||
|
self.total_failed += 1
|
||||||
|
if error:
|
||||||
|
self.last_error = error
|
||||||
|
self.last_error_time = datetime.utcnow()
|
||||||
|
error_type = error.split(':')[0] if ':' in error else error
|
||||||
|
self.errors_by_type[error_type] = self.errors_by_type.get(error_type, 0) + 1
|
||||||
|
|
||||||
|
# Update processing times with sliding window
|
||||||
|
self.processing_times.append(processing_time)
|
||||||
|
if len(self.processing_times) > 100: # Keep last 100 processing times
|
||||||
|
self.processing_times.pop(0)
|
||||||
|
|
||||||
|
# Update average processing time
|
||||||
|
self.avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0.0
|
||||||
|
|
||||||
|
# Update success rate
|
||||||
|
self.success_rate = (
|
||||||
|
(self.total_processed - self.total_failed) / self.total_processed
|
||||||
|
if self.total_processed > 0 else 0.0
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update peak memory usage
|
||||||
|
current_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
|
||||||
|
self.peak_memory_usage = max(self.peak_memory_usage, current_memory)
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class QueueItem:
|
class QueueItem:
|
||||||
"""Represents a video processing task in the queue"""
|
"""Represents a video processing task in the queue"""
|
||||||
@@ -52,38 +96,8 @@ class QueueItem:
|
|||||||
last_error: Optional[str] = None
|
last_error: Optional[str] = None
|
||||||
retry_count: int = 0
|
retry_count: int = 0
|
||||||
last_retry: Optional[datetime] = None
|
last_retry: Optional[datetime] = None
|
||||||
self.processing_times: List[float] = []
|
processing_times: List[float] = field(default_factory=list)
|
||||||
self.last_error: Optional[str] = None
|
last_error_time: Optional[datetime] = None
|
||||||
self.last_error_time: Optional[datetime] = None
|
|
||||||
|
|
||||||
def update_metrics(self, processing_time: float, success: bool, error: str = None):
|
|
||||||
"""Update metrics with new processing information"""
|
|
||||||
self.total_processed += 1
|
|
||||||
if not success:
|
|
||||||
self.total_failed += 1
|
|
||||||
if error:
|
|
||||||
self.last_error = error
|
|
||||||
self.last_error_time = datetime.utcnow()
|
|
||||||
error_type = error.split(':')[0] if ':' in error else error
|
|
||||||
self.errors_by_type[error_type] = self.errors_by_type.get(error_type, 0) + 1
|
|
||||||
|
|
||||||
# Update processing times with sliding window
|
|
||||||
self.processing_times.append(processing_time)
|
|
||||||
if len(self.processing_times) > 100: # Keep last 100 processing times
|
|
||||||
self.processing_times.pop(0)
|
|
||||||
|
|
||||||
# Update average processing time
|
|
||||||
self.avg_processing_time = sum(self.processing_times) / len(self.processing_times)
|
|
||||||
|
|
||||||
# Update success rate
|
|
||||||
self.success_rate = (
|
|
||||||
(self.total_processed - self.total_failed) / self.total_processed
|
|
||||||
if self.total_processed > 0 else 0.0
|
|
||||||
)
|
|
||||||
|
|
||||||
# Update peak memory usage
|
|
||||||
current_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
|
|
||||||
self.peak_memory_usage = max(self.peak_memory_usage, current_memory)
|
|
||||||
|
|
||||||
class EnhancedVideoQueueManager:
|
class EnhancedVideoQueueManager:
|
||||||
"""Enhanced queue manager with improved memory management and performance"""
|
"""Enhanced queue manager with improved memory management and performance"""
|
||||||
@@ -204,6 +218,82 @@ class EnhancedVideoQueueManager:
|
|||||||
logger.error(f"Error adding video to queue: {traceback.format_exc()}")
|
logger.error(f"Error adding video to queue: {traceback.format_exc()}")
|
||||||
raise QueueError(f"Failed to add to queue: {str(e)}")
|
raise QueueError(f"Failed to add to queue: {str(e)}")
|
||||||
|
|
||||||
|
async def process_queue(self, processor: Callable[[QueueItem], Tuple[bool, Optional[str]]]):
|
||||||
|
"""Process items in the queue with the provided processor function
|
||||||
|
|
||||||
|
Args:
|
||||||
|
processor: A callable that takes a QueueItem and returns a tuple of (success: bool, error: Optional[str])
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# Get next item from queue
|
||||||
|
item = None
|
||||||
|
async with self._queue_lock:
|
||||||
|
if self._queue:
|
||||||
|
item = self._queue.pop(0)
|
||||||
|
self._processing[item.url] = item
|
||||||
|
item.status = "processing"
|
||||||
|
item.processing_time = time.time()
|
||||||
|
|
||||||
|
if not item:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Process the item
|
||||||
|
start_time = time.time()
|
||||||
|
success, error = await processor(item)
|
||||||
|
processing_time = time.time() - start_time
|
||||||
|
|
||||||
|
# Update metrics
|
||||||
|
self.metrics.update(processing_time, success, error)
|
||||||
|
|
||||||
|
# Update item status
|
||||||
|
async with self._processing_lock:
|
||||||
|
if success:
|
||||||
|
item.status = "completed"
|
||||||
|
self._completed[item.url] = item
|
||||||
|
logger.info(f"Successfully processed item: {item.url}")
|
||||||
|
else:
|
||||||
|
item.status = "failed"
|
||||||
|
item.error = error
|
||||||
|
item.last_error = error
|
||||||
|
item.last_error_time = datetime.utcnow()
|
||||||
|
|
||||||
|
# Handle retries
|
||||||
|
if item.retry_count < self.max_retries:
|
||||||
|
item.retry_count += 1
|
||||||
|
item.status = "pending"
|
||||||
|
item.last_retry = datetime.utcnow()
|
||||||
|
self._queue.append(item)
|
||||||
|
logger.warning(f"Retrying item: {item.url} (attempt {item.retry_count})")
|
||||||
|
else:
|
||||||
|
self._failed[item.url] = item
|
||||||
|
logger.error(f"Failed to process item after {self.max_retries} attempts: {item.url}")
|
||||||
|
|
||||||
|
self._processing.pop(item.url, None)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing item {item.url}: {traceback.format_exc()}")
|
||||||
|
async with self._processing_lock:
|
||||||
|
item.status = "failed"
|
||||||
|
item.error = str(e)
|
||||||
|
item.last_error = str(e)
|
||||||
|
item.last_error_time = datetime.utcnow()
|
||||||
|
self._failed[item.url] = item
|
||||||
|
self._processing.pop(item.url, None)
|
||||||
|
|
||||||
|
# Persist state after processing
|
||||||
|
if self.persistence_path:
|
||||||
|
await self._persist_queue()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in queue processor: {traceback.format_exc()}")
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
# Small delay to prevent CPU overload
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
async def _periodic_backup(self):
|
async def _periodic_backup(self):
|
||||||
"""Periodically backup queue state"""
|
"""Periodically backup queue state"""
|
||||||
while True:
|
while True:
|
||||||
|
|||||||
Reference in New Issue
Block a user