This commit is contained in:
pacnpal
2024-11-15 02:38:22 +00:00
parent 49c844a422
commit 6f0f31944f
2 changed files with 151 additions and 95 deletions

View File

@@ -1,4 +1,5 @@
"""Enhanced queue system for VideoArchiver with improved memory management and performance""" """Enhanced queue system for VideoArchiver with improved memory management and performance"""
import asyncio import asyncio
import logging import logging
import json import json
@@ -20,19 +21,20 @@ from .exceptions import (
ResourceExhaustedError, ResourceExhaustedError,
ProcessingError, ProcessingError,
CleanupError, CleanupError,
FileOperationError FileOperationError,
) )
# Configure logging with proper format # Configure logging with proper format
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
) )
logger = logging.getLogger('EnhancedQueueManager') logger = logging.getLogger("EnhancedQueueManager")
@dataclass @dataclass
class QueueMetrics: class QueueMetrics:
"""Metrics tracking for queue performance and health""" """Metrics tracking for queue performance and health"""
total_processed: int = 0 total_processed: int = 0
total_failed: int = 0 total_failed: int = 0
avg_processing_time: float = 0.0 avg_processing_time: float = 0.0
@@ -534,54 +536,64 @@ class EnhancedVideoQueueManager:
logger.error(f"Error during cleanup: {str(e)}") logger.error(f"Error during cleanup: {str(e)}")
raise CleanupError(f"Failed to clean up queue manager: {str(e)}") raise CleanupError(f"Failed to clean up queue manager: {str(e)}")
def get_queue_status(self, guild_id: Optional[int] = None) -> Dict[str, Any]: async def clear_guild_queue(self, guild_id: int) -> int:
"""Get detailed queue status with metrics""" """Clear all queue items for a specific guild
Args:
guild_id: The ID of the guild to clear items for
Returns:
int: Number of items cleared
"""
try: try:
if guild_id is not None: cleared_count = 0
async with self._queue_lock:
# Get URLs for this guild
guild_urls = self._guild_queues.get(guild_id, set()) guild_urls = self._guild_queues.get(guild_id, set())
status = {
"pending": sum(1 for item in self._queue if item.url in guild_urls),
"processing": sum(
1 for url in self._processing if url in guild_urls
),
"completed": sum(1 for url in self._completed if url in guild_urls),
"failed": sum(1 for url in self._failed if url in guild_urls),
}
else:
status = {
"pending": len(self._queue),
"processing": len(self._processing),
"completed": len(self._completed),
"failed": len(self._failed),
}
# Add detailed metrics # Clear from pending queue
status.update( self._queue = [item for item in self._queue if item.guild_id != guild_id]
{
"metrics": { # Clear from processing
"total_processed": self.metrics.total_processed, for url in list(self._processing.keys()):
"total_failed": self.metrics.total_failed, if self._processing[url].guild_id == guild_id:
"success_rate": self.metrics.success_rate, self._processing.pop(url)
"avg_processing_time": self.metrics.avg_processing_time, cleared_count += 1
"peak_memory_usage": self.metrics.peak_memory_usage,
"last_cleanup": self.metrics.last_cleanup.isoformat(), # Clear from completed
"errors_by_type": self.metrics.errors_by_type, for url in list(self._completed.keys()):
"last_error": self.metrics.last_error, if self._completed[url].guild_id == guild_id:
"last_error_time": ( self._completed.pop(url)
self.metrics.last_error_time.isoformat() cleared_count += 1
if self.metrics.last_error_time
else None # Clear from failed
), for url in list(self._failed.keys()):
"retries": self.metrics.retries, if self._failed[url].guild_id == guild_id:
self._failed.pop(url)
cleared_count += 1
# Clear guild tracking
if guild_id in self._guild_queues:
cleared_count += len(self._guild_queues[guild_id])
self._guild_queues[guild_id].clear()
# Clear channel tracking for this guild's channels
for channel_id in list(self._channel_queues.keys()):
self._channel_queues[channel_id] = {
url for url in self._channel_queues[channel_id]
if url not in guild_urls
} }
}
)
return status # Persist updated state
if self.persistence_path:
await self._persist_queue()
logger.info(f"Cleared {cleared_count} items from guild {guild_id} queue")
return cleared_count
except Exception as e: except Exception as e:
logger.error(f"Error getting queue status: {str(e)}") logger.error(f"Error clearing guild queue: {traceback.format_exc()}")
raise QueueError(f"Failed to get queue status: {str(e)}") raise QueueError(f"Failed to clear guild queue: {str(e)}")
async def _periodic_cleanup(self): async def _periodic_cleanup(self):
"""Periodically clean up old completed/failed items""" """Periodically clean up old completed/failed items"""

View File

@@ -1,4 +1,5 @@
"""Video processing logic for VideoArchiver""" """Video processing logic for VideoArchiver"""
import discord import discord
import logging import logging
import yt_dlp import yt_dlp
@@ -15,7 +16,8 @@ from videoarchiver.utils.file_ops import secure_delete_file, cleanup_downloads
from videoarchiver.exceptions import ProcessingError, DiscordAPIError from videoarchiver.exceptions import ProcessingError, DiscordAPIError
from videoarchiver.enhanced_queue import EnhancedVideoQueueManager from videoarchiver.enhanced_queue import EnhancedVideoQueueManager
logger = logging.getLogger('VideoArchiver') logger = logging.getLogger("VideoArchiver")
class VideoProcessor: class VideoProcessor:
"""Handles video processing operations""" """Handles video processing operations"""
@@ -35,14 +37,41 @@ class VideoProcessor:
max_queue_size=1000, max_queue_size=1000,
cleanup_interval=1800, # 30 minutes (reduced from 1 hour for more frequent cleanup) cleanup_interval=1800, # 30 minutes (reduced from 1 hour for more frequent cleanup)
max_history_age=86400, # 24 hours max_history_age=86400, # 24 hours
persistence_path=str(queue_path) persistence_path=str(queue_path),
) )
# Track failed downloads for cleanup # Track failed downloads for cleanup
self._failed_downloads = set() self._failed_downloads = set()
self._failed_downloads_lock = asyncio.Lock() self._failed_downloads_lock = asyncio.Lock()
async def process_video_url(self, url: str, message: discord.Message, priority: int = 0) -> bool: # Start queue processing
self._queue_task = asyncio.create_task(self._process_queue())
async def _process_queue(self):
"""Process the queue continuously"""
try:
await self.queue_manager.process_queue(self._process_video)
except Exception as e:
logger.error(f"Queue processing error: {traceback.format_exc()}")
# Restart queue processing
self._queue_task = asyncio.create_task(self._process_queue())
async def _process_video(self, item: Any) -> Tuple[bool, Optional[str]]:
"""Process a video from the queue"""
try:
# Get the callback from the item
callback = getattr(item, "callback", None)
if callback:
success = await callback(item.url, True, "")
return success, None if success else "Callback failed"
return False, "No callback found"
except Exception as e:
logger.error(f"Error processing video: {traceback.format_exc()}")
return False, str(e)
async def process_video_url(
self, url: str, message: discord.Message, priority: int = 0
) -> bool:
"""Process a video URL: download, reupload, and cleanup""" """Process a video URL: download, reupload, and cleanup"""
guild_id = message.guild.id guild_id = message.guild.id
start_time = datetime.utcnow() start_time = datetime.utcnow()
@@ -62,7 +91,7 @@ class VideoProcessor:
await self._log_message( await self._log_message(
message.guild, message.guild,
f"User {message.author} does not have required roles for video archiving", f"User {message.author} does not have required roles for video archiving",
"warning" "warning",
) )
return False return False
@@ -74,9 +103,7 @@ class VideoProcessor:
await message.remove_reaction("", self.bot.user) await message.remove_reaction("", self.bot.user)
await message.add_reaction("") await message.add_reaction("")
await self._log_message( await self._log_message(
message.guild, message.guild, f"Failed to process video: {error}", "error"
f"Failed to process video: {error}",
"error"
) )
return False return False
@@ -93,9 +120,7 @@ class VideoProcessor:
await message.remove_reaction("", self.bot.user) await message.remove_reaction("", self.bot.user)
await message.add_reaction("") await message.add_reaction("")
await self._log_message( await self._log_message(
message.guild, message.guild, f"Failed to download video: {error}", "error"
f"Failed to download video: {error}",
"error"
) )
# Track failed download for cleanup # Track failed download for cleanup
if file_path: if file_path:
@@ -105,8 +130,12 @@ class VideoProcessor:
# Get channels with enhanced error handling # Get channels with enhanced error handling
try: try:
archive_channel = await self.config.get_channel(message.guild, "archive") archive_channel = await self.config.get_channel(
notification_channel = await self.config.get_channel(message.guild, "notification") message.guild, "archive"
)
notification_channel = await self.config.get_channel(
message.guild, "notification"
)
if not notification_channel: if not notification_channel:
notification_channel = archive_channel notification_channel = archive_channel
@@ -116,7 +145,7 @@ class VideoProcessor:
await self._log_message( await self._log_message(
message.guild, message.guild,
f"Channel configuration error: {str(e)}", f"Channel configuration error: {str(e)}",
"error" "error",
) )
return False return False
@@ -124,13 +153,14 @@ class VideoProcessor:
# Upload to archive channel with original message link # Upload to archive channel with original message link
file = discord.File(file_path) file = discord.File(file_path)
archive_message = await archive_channel.send( archive_message = await archive_channel.send(
f"Original: {message.jump_url}", f"Original: {message.jump_url}", file=file
file=file
) )
# Send notification with enhanced error handling for message formatting # Send notification with enhanced error handling for message formatting
try: try:
notification_content = self.components[guild_id]["message_manager"].format_archive_message( notification_content = self.components[guild_id][
"message_manager"
].format_archive_message(
username=message.author.name, username=message.author.name,
channel=message.channel.name, channel=message.channel.name,
original_message=message.jump_url, original_message=message.jump_url,
@@ -139,7 +169,9 @@ class VideoProcessor:
logger.error(f"Message formatting error: {str(e)}") logger.error(f"Message formatting error: {str(e)}")
notification_content = f"Video archived from {message.author.name} in {message.channel.name}\nOriginal: {message.jump_url}" notification_content = f"Video archived from {message.author.name} in {message.channel.name}\nOriginal: {message.jump_url}"
notification_message = await notification_channel.send(notification_content) notification_message = await notification_channel.send(
notification_content
)
# Schedule notification message deletion with error handling # Schedule notification message deletion with error handling
try: try:
@@ -149,26 +181,28 @@ class VideoProcessor:
notification_message.id, notification_message.delete notification_message.id, notification_message.delete
) )
except Exception as e: except Exception as e:
logger.error(f"Failed to schedule message deletion: {str(e)}") logger.error(
f"Failed to schedule message deletion: {str(e)}"
)
# Update reaction to show completion # Update reaction to show completion
await message.remove_reaction("", self.bot.user) await message.remove_reaction("", self.bot.user)
await message.add_reaction("") await message.add_reaction("")
# Log processing time # Log processing time
processing_time = (datetime.utcnow() - start_time).total_seconds() processing_time = (
datetime.utcnow() - start_time
).total_seconds()
await self._log_message( await self._log_message(
message.guild, message.guild,
f"Successfully archived video from {message.author} (took {processing_time:.1f}s)" f"Successfully archived video from {message.author} (took {processing_time:.1f}s)",
) )
return True return True
except discord.HTTPException as e: except discord.HTTPException as e:
await self._log_message( await self._log_message(
message.guild, message.guild, f"Discord API error: {str(e)}", "error"
f"Discord API error: {str(e)}",
"error"
) )
await message.remove_reaction("", self.bot.user) await message.remove_reaction("", self.bot.user)
await message.add_reaction("") await message.add_reaction("")
@@ -181,16 +215,22 @@ class VideoProcessor:
if secure_delete_file(file_path): if secure_delete_file(file_path):
await self._log_message( await self._log_message(
message.guild, message.guild,
f"Successfully deleted file: {file_path}" f"Successfully deleted file: {file_path}",
) )
else: else:
await self._log_message( await self._log_message(
message.guild, message.guild,
f"Failed to delete file: {file_path}", f"Failed to delete file: {file_path}",
"error" "error",
) )
# Emergency cleanup # Emergency cleanup
cleanup_downloads(str(self.components[guild_id]["downloader"].download_path)) cleanup_downloads(
str(
self.components[guild_id][
"downloader"
].download_path
)
)
except Exception as e: except Exception as e:
logger.error(f"File deletion error: {str(e)}") logger.error(f"File deletion error: {str(e)}")
# Track for later cleanup # Track for later cleanup
@@ -200,9 +240,7 @@ class VideoProcessor:
except Exception as e: except Exception as e:
logger.error(f"Process callback error: {traceback.format_exc()}") logger.error(f"Process callback error: {traceback.format_exc()}")
await self._log_message( await self._log_message(
message.guild, message.guild, f"Error in process callback: {str(e)}", "error"
f"Error in process callback: {str(e)}",
"error"
) )
return False return False
@@ -215,16 +253,14 @@ class VideoProcessor:
guild_id=guild_id, guild_id=guild_id,
author_id=message.author.id, author_id=message.author.id,
callback=process_callback, callback=process_callback,
priority=priority priority=priority,
) )
except Exception as e: except Exception as e:
logger.error(f"Queue error: {str(e)}") logger.error(f"Queue error: {str(e)}")
await message.remove_reaction("", self.bot.user) await message.remove_reaction("", self.bot.user)
await message.add_reaction("") await message.add_reaction("")
await self._log_message( await self._log_message(
message.guild, message.guild, f"Failed to add to queue: {str(e)}", "error"
f"Failed to add to queue: {str(e)}",
"error"
) )
return False return False
@@ -235,7 +271,7 @@ class VideoProcessor:
f"Queue Status - Pending: {queue_status['pending']}, " f"Queue Status - Pending: {queue_status['pending']}, "
f"Processing: {queue_status['processing']}, " f"Processing: {queue_status['processing']}, "
f"Success Rate: {queue_status['metrics']['success_rate']:.2%}, " f"Success Rate: {queue_status['metrics']['success_rate']:.2%}, "
f"Avg Processing Time: {queue_status['metrics']['avg_processing_time']:.1f}s" f"Avg Processing Time: {queue_status['metrics']['avg_processing_time']:.1f}s",
) )
return True return True
@@ -243,9 +279,7 @@ class VideoProcessor:
except Exception as e: except Exception as e:
logger.error(f"Error processing video: {traceback.format_exc()}") logger.error(f"Error processing video: {traceback.format_exc()}")
await self._log_message( await self._log_message(
message.guild, message.guild, f"Error processing video: {str(e)}", "error"
f"Error processing video: {str(e)}",
"error"
) )
await message.remove_reaction("", self.bot.user) await message.remove_reaction("", self.bot.user)
await message.add_reaction("") await message.add_reaction("")
@@ -277,9 +311,7 @@ class VideoProcessor:
except Exception as e: except Exception as e:
logger.error(f"Error processing message: {traceback.format_exc()}") logger.error(f"Error processing message: {traceback.format_exc()}")
await self._log_message( await self._log_message(
message.guild, message.guild, f"Error processing message: {str(e)}", "error"
f"Error processing message: {str(e)}",
"error"
) )
def _extract_urls(self, content: str) -> List[str]: def _extract_urls(self, content: str) -> List[str]:
@@ -293,7 +325,7 @@ class VideoProcessor:
for word in words: for word in words:
# Try each extractor # Try each extractor
for ie in ydl._ies: for ie in ydl._ies:
if hasattr(ie, '_VALID_URL') and ie._VALID_URL: if hasattr(ie, "_VALID_URL") and ie._VALID_URL:
# Use regex pattern matching instead of suitable() # Use regex pattern matching instead of suitable()
if re.match(ie._VALID_URL, word): if re.match(ie._VALID_URL, word):
urls.append(word) urls.append(word)
@@ -302,7 +334,9 @@ class VideoProcessor:
logger.error(f"URL extraction error: {str(e)}") logger.error(f"URL extraction error: {str(e)}")
return list(set(urls)) # Remove duplicates return list(set(urls)) # Remove duplicates
async def _log_message(self, guild: discord.Guild, message: str, level: str = "info"): async def _log_message(
self, guild: discord.Guild, message: str, level: str = "info"
):
"""Log a message to the guild's log channel with enhanced formatting""" """Log a message to the guild's log channel with enhanced formatting"""
log_channel = await self.config.get_channel(guild, "log") log_channel = await self.config.get_channel(guild, "log")
if log_channel: if log_channel:
@@ -311,12 +345,22 @@ class VideoProcessor:
formatted_message = f"[{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}] [{level.upper()}] {message}" formatted_message = f"[{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}] [{level.upper()}] {message}"
await log_channel.send(formatted_message) await log_channel.send(formatted_message)
except discord.HTTPException as e: except discord.HTTPException as e:
logger.error(f"Failed to send log message to channel: {message} ({str(e)})") logger.error(
f"Failed to send log message to channel: {message} ({str(e)})"
)
logger.log(getattr(logging, level.upper()), message) logger.log(getattr(logging, level.upper()), message)
async def cleanup(self): async def cleanup(self):
"""Clean up resources with enhanced error handling""" """Clean up resources with enhanced error handling"""
try: try:
# Cancel queue processing task
if hasattr(self, "_queue_task"):
self._queue_task.cancel()
try:
await self._queue_task
except asyncio.CancelledError:
pass
# Clean up queue # Clean up queue
await self.queue_manager.cleanup() await self.queue_manager.cleanup()