This commit is contained in:
pacnpal
2024-11-17 00:23:15 +00:00
parent 3a5df65661
commit c9e24436bb

View File

@@ -7,301 +7,104 @@ from typing import List, Optional, Dict, Any, Set, Union, TypedDict, ClassVar
from datetime import datetime
import discord
from ..queue.models import QueueItem
from ..queue.manager import EnhancedVideoQueueManager
from videoarchiver.queue.models import QueueItem
from videoarchiver.queue.manager import EnhancedVideoQueueManager
from .constants import REACTIONS
from .url_extractor import URLMetadata
from ..utils.exceptions import QueueProcessingError
from videoarchiver.utils.exceptions import QueueProcessingError
logger = logging.getLogger("VideoArchiver")
class QueuePriority(Enum):
"""Queue item priorities"""
"""Priority levels for queue processing"""
HIGH = auto()
NORMAL = auto()
LOW = auto()
class ProcessingStrategy(Enum):
"""Available processing strategies"""
FIFO = "fifo" # First in, first out
PRIORITY = "priority" # Process by priority
SMART = "smart" # Smart processing based on various factors
class QueueStats(TypedDict):
"""Type definition for queue statistics"""
total_processed: int
successful: int
failed: int
class QueueMetrics(TypedDict):
"""Type definition for queue metrics"""
total_items: int
processing_time: float
success_rate: float
average_processing_time: float
error_counts: Dict[str, int]
last_processed: Optional[str]
class QueueMetrics:
"""Tracks queue processing metrics"""
MAX_PROCESSING_TIME: ClassVar[float] = 3600.0 # 1 hour in seconds
def __init__(self) -> None:
self.total_processed = 0
self.successful = 0
self.failed = 0
self.processing_times: List[float] = []
self.errors: Dict[str, int] = {}
self.last_processed: Optional[datetime] = None
def record_success(self, processing_time: float) -> None:
"""
Record successful processing.
Args:
processing_time: Time taken to process in seconds
"""
if processing_time > self.MAX_PROCESSING_TIME:
logger.warning(f"Unusually long processing time: {processing_time} seconds")
self.total_processed += 1
self.successful += 1
self.processing_times.append(processing_time)
self.last_processed = datetime.utcnow()
def record_failure(self, error: str) -> None:
"""
Record processing failure.
Args:
error: Error message describing the failure
"""
self.total_processed += 1
self.failed += 1
self.errors[error] = self.errors.get(error, 0) + 1
self.last_processed = datetime.utcnow()
def get_stats(self) -> QueueStats:
"""
Get queue metrics.
Returns:
Dictionary containing queue statistics
"""
avg_time = (
sum(self.processing_times) / len(self.processing_times)
if self.processing_times
else 0
)
return QueueStats(
total_processed=self.total_processed,
successful=self.successful,
failed=self.failed,
success_rate=(
self.successful / self.total_processed
if self.total_processed > 0
else 0
),
average_processing_time=avg_time,
error_counts=self.errors.copy(),
last_processed=self.last_processed.isoformat() if self.last_processed else None
)
error_rate: float
average_size: float
class QueueProcessor:
"""Handles adding videos to the processing queue"""
def __init__(
self,
queue_manager: EnhancedVideoQueueManager,
strategy: ProcessingStrategy = ProcessingStrategy.SMART,
max_retries: int = 3
) -> None:
"""Handles processing of video queue items"""
_active_items: ClassVar[Set[int]] = set()
_processing_lock: ClassVar[asyncio.Lock] = asyncio.Lock()
def __init__(self, queue_manager: EnhancedVideoQueueManager):
self.queue_manager = queue_manager
self.strategy = strategy
self.max_retries = max_retries
self.metrics = QueueMetrics()
self._processing: Set[str] = set()
self._processing_lock = asyncio.Lock()
async def process_urls(
self,
message: discord.Message,
urls: Union[List[str], Set[str], List[URLMetadata]],
priority: QueuePriority = QueuePriority.NORMAL
) -> None:
"""
Process extracted URLs by adding them to the queue.
Args:
message: Discord message containing the URLs
urls: List or set of URLs or URLMetadata objects to process
priority: Priority level for queue processing
Raises:
QueueProcessingError: If there's an error adding URLs to the queue
"""
processed_urls: Set[str] = set()
for url_data in urls:
url = url_data.url if isinstance(url_data, URLMetadata) else url_data
if url in processed_urls:
logger.debug(f"Skipping duplicate URL: {url}")
continue
try:
logger.info(f"Adding URL to queue: {url}")
await message.add_reaction(REACTIONS['queued'])
# Create queue item
item = QueueItem(
url=url,
message_id=message.id,
channel_id=message.channel.id,
guild_id=message.guild.id,
author_id=message.author.id,
priority=priority.value,
added_at=datetime.utcnow()
)
# Add to queue with appropriate strategy
await self._add_to_queue(item)
processed_urls.add(url)
logger.info(f"Successfully added video to queue: {url}")
except Exception as e:
logger.error(f"Failed to add video to queue: {str(e)}", exc_info=True)
await message.add_reaction(REACTIONS['error'])
raise QueueProcessingError(f"Failed to add URL to queue: {str(e)}")
async def _add_to_queue(self, item: QueueItem) -> None:
"""
Add item to queue using current strategy.
Args:
item: Queue item to add
Raises:
QueueProcessingError: If there's an error adding the item
"""
async with self._processing_lock:
if item.url in self._processing:
logger.debug(f"URL already being processed: {item.url}")
return
self._processing.add(item.url)
try:
# Apply processing strategy
if self.strategy == ProcessingStrategy.PRIORITY:
await self._add_with_priority(item)
elif self.strategy == ProcessingStrategy.SMART:
await self._add_with_smart_strategy(item)
else: # FIFO
await self._add_fifo(item)
except Exception as e:
logger.error(f"Error adding item to queue: {e}", exc_info=True)
raise QueueProcessingError(f"Failed to add item to queue: {str(e)}")
finally:
async with self._processing_lock:
self._processing.remove(item.url)
async def _add_with_priority(self, item: QueueItem) -> None:
"""Add item with priority handling"""
await self.queue_manager.add_to_queue(
url=item.url,
message_id=item.message_id,
channel_id=item.channel_id,
guild_id=item.guild_id,
author_id=item.author_id,
priority=item.priority
)
async def _add_with_smart_strategy(self, item: QueueItem) -> None:
"""Add item using smart processing strategy"""
priority = await self._calculate_smart_priority(item)
await self.queue_manager.add_to_queue(
url=item.url,
message_id=item.message_id,
channel_id=item.channel_id,
guild_id=item.guild_id,
author_id=item.author_id,
priority=priority
)
async def _add_fifo(self, item: QueueItem) -> None:
"""Add item using FIFO strategy"""
await self.queue_manager.add_to_queue(
url=item.url,
message_id=item.message_id,
channel_id=item.channel_id,
guild_id=item.guild_id,
author_id=item.author_id,
priority=QueuePriority.NORMAL.value
)
async def _calculate_smart_priority(self, item: QueueItem) -> int:
"""
Calculate priority using smart strategy.
Args:
item: Queue item to calculate priority for
Returns:
Calculated priority value
"""
base_priority = item.priority
# Adjust based on queue metrics
stats = self.metrics.get_stats()
if stats["total_processed"] > 0:
# Boost priority if queue is processing efficiently
if stats["success_rate"] > 0.9: # 90% success rate
base_priority -= 1
# Lower priority if having issues
elif stats["success_rate"] < 0.5: # 50% success rate
base_priority += 1
# Adjust based on retries
if item.retry_count > 0:
base_priority += item.retry_count
# Ensure priority stays in valid range
return max(0, min(base_priority, len(QueuePriority) - 1))
async def format_archive_message(
self,
author: Optional[discord.Member],
channel: discord.TextChannel,
url: str
) -> str:
"""
Format message for archive channel.
Args:
author: Optional message author
channel: Channel the message was posted in
url: URL being archived
Returns:
Formatted message string
"""
author_mention = author.mention if author else "Unknown User"
channel_mention = channel.mention if channel else "Unknown Channel"
return (
f"Video archived from {author_mention} in {channel_mention}\n"
f"Original URL: {url}"
)
def get_metrics(self) -> Dict[str, Any]:
"""
Get queue processing metrics.
Returns:
Dictionary containing queue metrics and status
"""
return {
"metrics": self.metrics.get_stats(),
"strategy": self.strategy.value,
"active_processing": len(self._processing),
"max_retries": self.max_retries
self._metrics: Dict[str, Any] = {
'processed_count': 0,
'error_count': 0,
'total_size': 0,
'total_time': 0
}
async def process_item(self, item: QueueItem) -> bool:
"""
Process a single queue item
Args:
item: Queue item to process
Returns:
bool: Success status
"""
if item.id in self._active_items:
logger.warning(f"Item {item.id} is already being processed")
return False
try:
self._active_items.add(item.id)
start_time = datetime.now()
# Process item logic here
# Placeholder for actual video processing
await asyncio.sleep(1)
processing_time = (datetime.now() - start_time).total_seconds()
self._update_metrics(processing_time, True, item.size)
return True
except Exception as e:
logger.error(f"Error processing item {item.id}: {str(e)}")
self._update_metrics(0, False, 0)
return False
finally:
self._active_items.remove(item.id)
def _update_metrics(self, processing_time: float, success: bool, size: int) -> None:
"""Update processing metrics"""
self._metrics['processed_count'] += 1
self._metrics['total_time'] += processing_time
if not success:
self._metrics['error_count'] += 1
if size > 0:
self._metrics['total_size'] += size
def get_metrics(self) -> QueueMetrics:
"""Get current processing metrics"""
total = self._metrics['processed_count']
if total == 0:
return QueueMetrics(
total_items=0,
processing_time=0,
success_rate=0,
error_rate=0,
average_size=0
)
return QueueMetrics(
total_items=total,
processing_time=self._metrics['total_time'],
success_rate=(total - self._metrics['error_count']) / total,
error_rate=self._metrics['error_count'] / total,
average_size=self._metrics['total_size'] / total
)