mirror of
https://github.com/pacnpal/Pac-cogs.git
synced 2025-12-20 02:41:06 -05:00
- Accept all URLs if no sites are enabled (default behavior) - Only filter URLs against enabled_sites if specific sites are set - Add detailed logging for URL detection process - Log guild settings and enabled sites This fixes the issue where URLs weren't being detected when no sites were explicitly enabled.
598 lines
24 KiB
Python
598 lines
24 KiB
Python
"""Video processing logic for VideoArchiver"""
|
||
|
||
import os
|
||
import logging
|
||
import asyncio
|
||
import discord
|
||
from discord.ext import commands
|
||
from discord import app_commands
|
||
from pathlib import Path
|
||
from typing import Dict, Any, Optional, Tuple, Set
|
||
import traceback
|
||
from datetime import datetime
|
||
|
||
from videoarchiver.queue import EnhancedVideoQueueManager # Updated import
|
||
from videoarchiver.utils.exceptions import (
|
||
ProcessingError,
|
||
ConfigurationError,
|
||
VideoVerificationError,
|
||
QueueError,
|
||
FileOperationError
|
||
)
|
||
|
||
logger = logging.getLogger("VideoArchiver")
|
||
|
||
# Reaction emojis
|
||
REACTIONS = {
|
||
'queued': '📹',
|
||
'processing': '⚙️',
|
||
'success': '✅',
|
||
'error': '❌',
|
||
'numbers': ['1️⃣', '2️⃣', '3️⃣', '4️⃣', '5️⃣'],
|
||
'progress': ['⬛', '🟨', '🟩'],
|
||
'download': ['0️⃣', '2️⃣', '4️⃣', '6️⃣', '8️⃣', '🔟']
|
||
}
|
||
|
||
# Global queue manager instance to persist across reloads
|
||
_global_queue_manager = None
|
||
|
||
# Track detailed progress information
|
||
_download_progress: Dict[str, Dict[str, Any]] = {}
|
||
_compression_progress: Dict[str, Dict[str, Any]] = {}
|
||
|
||
class VideoProcessor:
|
||
"""Handles video processing operations"""
|
||
|
||
def __init__(
|
||
self,
|
||
bot,
|
||
config_manager,
|
||
components,
|
||
queue_manager=None,
|
||
ffmpeg_mgr=None
|
||
):
|
||
self.bot = bot
|
||
self.config = config_manager
|
||
self.components = components
|
||
self.ffmpeg_mgr = ffmpeg_mgr
|
||
|
||
# Track active downloads and their tasks
|
||
self._active_downloads: Dict[str, asyncio.Task] = {}
|
||
self._active_downloads_lock = asyncio.Lock()
|
||
self._unloading = False
|
||
|
||
# Use global queue manager if available
|
||
global _global_queue_manager
|
||
if _global_queue_manager is not None:
|
||
self.queue_manager = _global_queue_manager
|
||
logger.info("Using existing global queue manager")
|
||
elif queue_manager:
|
||
self.queue_manager = queue_manager
|
||
_global_queue_manager = queue_manager
|
||
logger.info("Using provided queue manager and setting as global")
|
||
else:
|
||
data_dir = Path(os.path.dirname(__file__)) / "data"
|
||
data_dir.mkdir(parents=True, exist_ok=True)
|
||
queue_path = data_dir / "queue_state.json"
|
||
|
||
self.queue_manager = EnhancedVideoQueueManager(
|
||
max_retries=3,
|
||
retry_delay=5,
|
||
max_queue_size=1000,
|
||
cleanup_interval=1800,
|
||
max_history_age=86400,
|
||
persistence_path=str(queue_path)
|
||
)
|
||
_global_queue_manager = self.queue_manager
|
||
logger.info("Created new queue manager and set as global")
|
||
|
||
# Start queue processing
|
||
logger.info("Starting video processing queue...")
|
||
self._queue_task = asyncio.create_task(self.queue_manager.process_queue(self._process_video))
|
||
logger.info("Video processing queue started successfully")
|
||
|
||
async def cleanup(self):
|
||
"""Clean up resources and stop processing"""
|
||
try:
|
||
logger.info("Starting VideoProcessor cleanup...")
|
||
self._unloading = True
|
||
|
||
# Cancel all active downloads
|
||
async with self._active_downloads_lock:
|
||
for url, task in list(self._active_downloads.items()):
|
||
if not task.done():
|
||
task.cancel()
|
||
try:
|
||
await task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
except Exception as e:
|
||
logger.error(f"Error cancelling download task for {url}: {e}")
|
||
self._active_downloads.clear()
|
||
|
||
# Clean up queue manager
|
||
if hasattr(self, 'queue_manager'):
|
||
try:
|
||
await self.queue_manager.cleanup()
|
||
except Exception as e:
|
||
logger.error(f"Error cleaning up queue manager: {e}")
|
||
|
||
# Clean up FFmpeg manager
|
||
if self.ffmpeg_mgr:
|
||
try:
|
||
self.ffmpeg_mgr.kill_all_processes()
|
||
except Exception as e:
|
||
logger.error(f"Error cleaning up FFmpeg manager: {e}")
|
||
|
||
# Cancel queue processing task
|
||
if hasattr(self, '_queue_task') and not self._queue_task.done():
|
||
self._queue_task.cancel()
|
||
try:
|
||
await self._queue_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
except Exception as e:
|
||
logger.error(f"Error cancelling queue task: {e}")
|
||
|
||
logger.info("VideoProcessor cleanup completed successfully")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error during VideoProcessor cleanup: {traceback.format_exc()}")
|
||
raise ProcessingError(f"Cleanup failed: {str(e)}")
|
||
|
||
async def force_cleanup(self):
|
||
"""Force cleanup of resources when normal cleanup fails or times out"""
|
||
try:
|
||
logger.info("Starting force cleanup of VideoProcessor...")
|
||
self._unloading = True
|
||
|
||
# Force cancel all active downloads
|
||
for url, task in list(self._active_downloads.items()):
|
||
if not task.done():
|
||
task.cancel()
|
||
self._active_downloads.clear()
|
||
|
||
# Force cleanup queue manager
|
||
if hasattr(self, 'queue_manager'):
|
||
try:
|
||
self.queue_manager.force_stop()
|
||
except Exception as e:
|
||
logger.error(f"Error force stopping queue manager: {e}")
|
||
|
||
# Force cleanup FFmpeg
|
||
if self.ffmpeg_mgr:
|
||
try:
|
||
self.ffmpeg_mgr.kill_all_processes()
|
||
except Exception as e:
|
||
logger.error(f"Error force cleaning FFmpeg manager: {e}")
|
||
|
||
# Force cancel queue task
|
||
if hasattr(self, '_queue_task') and not self._queue_task.done():
|
||
self._queue_task.cancel()
|
||
|
||
logger.info("VideoProcessor force cleanup completed")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error during VideoProcessor force cleanup: {traceback.format_exc()}")
|
||
# Don't raise here as this is the last resort cleanup
|
||
|
||
async def process_message(self, message: discord.Message) -> None:
|
||
"""Process a message for video content"""
|
||
try:
|
||
# Check if message contains any video URLs
|
||
if not message.content and not message.attachments:
|
||
logger.debug(f"No content or attachments in message {message.id}")
|
||
return
|
||
|
||
# Get guild settings
|
||
settings = await self.config.get_guild_settings(message.guild.id)
|
||
if not settings:
|
||
logger.warning(f"No settings found for guild {message.guild.id}")
|
||
return
|
||
|
||
# Log settings for debugging
|
||
logger.debug(f"Guild {message.guild.id} settings: {settings}")
|
||
|
||
# Check if channel is enabled
|
||
enabled_channels = settings.get("enabled_channels", [])
|
||
if enabled_channels and message.channel.id not in enabled_channels:
|
||
logger.debug(f"Channel {message.channel.id} not in enabled channels: {enabled_channels}")
|
||
return
|
||
|
||
# Extract URLs from message content and attachments
|
||
urls = []
|
||
if message.content:
|
||
# Log message content for debugging
|
||
logger.debug(f"Processing message content: {message.content}")
|
||
enabled_sites = settings.get("enabled_sites", [])
|
||
logger.debug(f"Enabled sites: {enabled_sites}")
|
||
|
||
# Add URLs from message content
|
||
for word in message.content.split():
|
||
# Log each word being checked
|
||
logger.debug(f"Checking word: {word}")
|
||
# If no sites are enabled, accept all URLs
|
||
# Otherwise, check if URL contains any enabled site
|
||
if not enabled_sites or any(site in word.lower() for site in enabled_sites):
|
||
logger.debug(f"Found matching URL: {word}")
|
||
urls.append(word)
|
||
|
||
# Add attachment URLs
|
||
for attachment in message.attachments:
|
||
logger.debug(f"Checking attachment: {attachment.filename}")
|
||
if any(attachment.filename.lower().endswith(ext) for ext in ['.mp4', '.mov', '.avi', '.webm']):
|
||
logger.debug(f"Found video attachment: {attachment.url}")
|
||
urls.append(attachment.url)
|
||
|
||
if not urls:
|
||
logger.debug("No valid URLs found in message")
|
||
return
|
||
|
||
# Add each URL to the queue
|
||
for url in urls:
|
||
try:
|
||
logger.info(f"Adding URL to queue: {url}")
|
||
await message.add_reaction(REACTIONS['queued'])
|
||
await self.queue_manager.add_to_queue(
|
||
url=url,
|
||
message_id=message.id,
|
||
channel_id=message.channel.id,
|
||
guild_id=message.guild.id,
|
||
author_id=message.author.id,
|
||
priority=0
|
||
)
|
||
logger.info(f"Successfully added video to queue: {url}")
|
||
except QueueError as e:
|
||
logger.error(f"Failed to add video to queue: {str(e)}")
|
||
await message.add_reaction(REACTIONS['error'])
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error processing message: {traceback.format_exc()}")
|
||
try:
|
||
await message.add_reaction(REACTIONS['error'])
|
||
except:
|
||
pass
|
||
|
||
async def _process_video(self, item) -> Tuple[bool, Optional[str]]:
|
||
"""Process a video from the queue"""
|
||
if self._unloading:
|
||
return False, "Processor is unloading"
|
||
|
||
file_path = None
|
||
original_message = None
|
||
download_task = None
|
||
|
||
try:
|
||
guild_id = item.guild_id
|
||
if guild_id not in self.components:
|
||
return False, f"No components found for guild {guild_id}"
|
||
|
||
components = self.components[guild_id]
|
||
downloader = components.get("downloader")
|
||
message_manager = components.get("message_manager")
|
||
|
||
if not downloader or not message_manager:
|
||
return False, f"Missing required components for guild {guild_id}"
|
||
|
||
try:
|
||
channel = self.bot.get_channel(item.channel_id)
|
||
if not channel:
|
||
return False, f"Channel {item.channel_id} not found"
|
||
original_message = await channel.fetch_message(item.message_id)
|
||
|
||
await original_message.remove_reaction(REACTIONS['queued'], self.bot.user)
|
||
await original_message.add_reaction(REACTIONS['processing'])
|
||
logger.info(f"Started processing message {item.message_id}")
|
||
except discord.NotFound:
|
||
original_message = None
|
||
except Exception as e:
|
||
logger.error(f"Error fetching original message: {e}")
|
||
original_message = None
|
||
|
||
# Create progress callback that creates tasks directly
|
||
def progress_callback(progress: float) -> None:
|
||
if original_message:
|
||
try:
|
||
# Try to get the current event loop
|
||
try:
|
||
loop = asyncio.get_running_loop()
|
||
except RuntimeError:
|
||
# If no event loop is running in this thread,
|
||
# we'll use the bot's loop which we know exists
|
||
loop = self.bot.loop
|
||
|
||
if not loop.is_running():
|
||
logger.warning("Event loop is not running, skipping progress update")
|
||
return
|
||
|
||
# Create a task to update the reaction
|
||
asyncio.run_coroutine_threadsafe(
|
||
self.update_download_progress_reaction(original_message, progress),
|
||
loop
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Error in progress callback: {e}")
|
||
|
||
# Create and track download task
|
||
download_task = asyncio.create_task(
|
||
downloader.download_video(
|
||
item.url,
|
||
progress_callback=progress_callback
|
||
)
|
||
)
|
||
|
||
async with self._active_downloads_lock:
|
||
self._active_downloads[item.url] = download_task
|
||
|
||
try:
|
||
success, file_path, error = await download_task
|
||
if not success:
|
||
if original_message:
|
||
await original_message.add_reaction(REACTIONS['error'])
|
||
logger.error(f"Download failed for message {item.message_id}: {error}")
|
||
return False, f"Failed to download video: {error}"
|
||
except asyncio.CancelledError:
|
||
logger.info(f"Download cancelled for {item.url}")
|
||
return False, "Download cancelled"
|
||
except Exception as e:
|
||
if original_message:
|
||
await original_message.add_reaction(REACTIONS['error'])
|
||
logger.error(f"Download error for message {item.message_id}: {str(e)}")
|
||
return False, f"Download error: {str(e)}"
|
||
finally:
|
||
async with self._active_downloads_lock:
|
||
self._active_downloads.pop(item.url, None)
|
||
|
||
# Get archive channel
|
||
guild = self.bot.get_guild(guild_id)
|
||
if not guild:
|
||
return False, f"Guild {guild_id} not found"
|
||
|
||
archive_channel = await self.config.get_channel(guild, "archive")
|
||
if not archive_channel:
|
||
return False, "Archive channel not configured"
|
||
|
||
# Format message
|
||
try:
|
||
author = original_message.author if original_message else None
|
||
message = await message_manager.format_message(
|
||
author=author,
|
||
channel=channel,
|
||
url=item.url
|
||
)
|
||
except Exception as e:
|
||
return False, f"Failed to format message: {str(e)}"
|
||
|
||
# Upload to archive channel
|
||
try:
|
||
if not os.path.exists(file_path):
|
||
return False, "Processed file not found"
|
||
|
||
await archive_channel.send(
|
||
content=message,
|
||
file=discord.File(file_path)
|
||
)
|
||
|
||
if original_message:
|
||
await original_message.remove_reaction(REACTIONS['processing'], self.bot.user)
|
||
await original_message.add_reaction(REACTIONS['success'])
|
||
logger.info(f"Successfully processed message {item.message_id}")
|
||
|
||
return True, None
|
||
|
||
except discord.HTTPException as e:
|
||
if original_message:
|
||
await original_message.add_reaction(REACTIONS['error'])
|
||
logger.error(f"Failed to upload to Discord for message {item.message_id}: {str(e)}")
|
||
return False, f"Failed to upload to Discord: {str(e)}"
|
||
except Exception as e:
|
||
if original_message:
|
||
await original_message.add_reaction(REACTIONS['error'])
|
||
logger.error(f"Failed to archive video for message {item.message_id}: {str(e)}")
|
||
return False, f"Failed to archive video: {str(e)}"
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error processing video: {traceback.format_exc()}")
|
||
return False, str(e)
|
||
finally:
|
||
# Clean up downloaded file
|
||
if file_path and os.path.exists(file_path):
|
||
try:
|
||
os.unlink(file_path)
|
||
except Exception as e:
|
||
logger.error(f"Failed to clean up file {file_path}: {e}")
|
||
|
||
async def update_queue_position_reaction(self, message, position):
|
||
"""Update queue position reaction"""
|
||
try:
|
||
for reaction in REACTIONS['numbers']:
|
||
try:
|
||
await message.remove_reaction(reaction, self.bot.user)
|
||
except:
|
||
pass
|
||
|
||
if 0 <= position < len(REACTIONS['numbers']):
|
||
await message.add_reaction(REACTIONS['numbers'][position])
|
||
logger.info(f"Updated queue position reaction to {position + 1} for message {message.id}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to update queue position reaction: {e}")
|
||
|
||
async def update_progress_reaction(self, message, progress):
|
||
"""Update progress reaction based on FFmpeg progress"""
|
||
if not message:
|
||
return
|
||
|
||
try:
|
||
# Get event loop for the current context
|
||
loop = asyncio.get_running_loop()
|
||
|
||
# Remove old reactions in the event loop
|
||
for reaction in REACTIONS['progress']:
|
||
try:
|
||
await message.remove_reaction(reaction, self.bot.user)
|
||
except Exception as e:
|
||
logger.error(f"Failed to remove progress reaction: {e}")
|
||
continue
|
||
|
||
# Add new reaction based on progress
|
||
try:
|
||
if progress < 33:
|
||
await message.add_reaction(REACTIONS['progress'][0])
|
||
elif progress < 66:
|
||
await message.add_reaction(REACTIONS['progress'][1])
|
||
else:
|
||
await message.add_reaction(REACTIONS['progress'][2])
|
||
except Exception as e:
|
||
logger.error(f"Failed to add progress reaction: {e}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to update progress reaction: {e}")
|
||
|
||
async def update_download_progress_reaction(self, message, progress):
|
||
"""Update download progress reaction"""
|
||
if not message:
|
||
return
|
||
|
||
try:
|
||
# Remove old reactions in the event loop
|
||
for reaction in REACTIONS['download']:
|
||
try:
|
||
await message.remove_reaction(reaction, self.bot.user)
|
||
except Exception as e:
|
||
logger.error(f"Failed to remove download reaction: {e}")
|
||
continue
|
||
|
||
# Add new reaction based on progress
|
||
try:
|
||
if progress <= 20:
|
||
await message.add_reaction(REACTIONS['download'][0])
|
||
elif progress <= 40:
|
||
await message.add_reaction(REACTIONS['download'][1])
|
||
elif progress <= 60:
|
||
await message.add_reaction(REACTIONS['download'][2])
|
||
elif progress <= 80:
|
||
await message.add_reaction(REACTIONS['download'][3])
|
||
elif progress < 100:
|
||
await message.add_reaction(REACTIONS['download'][4])
|
||
else:
|
||
await message.add_reaction(REACTIONS['download'][5])
|
||
except Exception as e:
|
||
logger.error(f"Failed to add download reaction: {e}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to update download progress reaction: {e}")
|
||
|
||
async def _show_queue_details(self, ctx):
|
||
"""Display detailed queue status and progress information"""
|
||
try:
|
||
# Get queue status
|
||
queue_status = self.queue_manager.get_queue_status(ctx.guild.id)
|
||
|
||
# Create embed for queue overview
|
||
embed = discord.Embed(
|
||
title="Queue Status Details",
|
||
color=discord.Color.blue(),
|
||
timestamp=datetime.utcnow()
|
||
)
|
||
|
||
# Queue statistics
|
||
embed.add_field(
|
||
name="Queue Statistics",
|
||
value=f"```\n"
|
||
f"Pending: {queue_status['pending']}\n"
|
||
f"Processing: {queue_status['processing']}\n"
|
||
f"Completed: {queue_status['completed']}\n"
|
||
f"Failed: {queue_status['failed']}\n"
|
||
f"Success Rate: {queue_status['metrics']['success_rate']:.1%}\n"
|
||
f"Avg Processing Time: {queue_status['metrics']['avg_processing_time']:.1f}s\n"
|
||
f"```",
|
||
inline=False
|
||
)
|
||
|
||
# Active downloads
|
||
active_downloads = ""
|
||
for url, progress in _download_progress.items():
|
||
if progress.get('active', False):
|
||
active_downloads += (
|
||
f"URL: {url[:50]}...\n"
|
||
f"Progress: {progress.get('percent', 0):.1f}%\n"
|
||
f"Speed: {progress.get('speed', 'N/A')}\n"
|
||
f"ETA: {progress.get('eta', 'N/A')}\n"
|
||
f"Size: {progress.get('downloaded_bytes', 0)}/{progress.get('total_bytes', 0)} bytes\n"
|
||
f"Started: {progress.get('start_time', 'N/A')}\n"
|
||
f"Retries: {progress.get('retries', 0)}\n"
|
||
f"-------------------\n"
|
||
)
|
||
|
||
if active_downloads:
|
||
embed.add_field(
|
||
name="Active Downloads",
|
||
value=f"```\n{active_downloads}```",
|
||
inline=False
|
||
)
|
||
else:
|
||
embed.add_field(
|
||
name="Active Downloads",
|
||
value="```\nNo active downloads```",
|
||
inline=False
|
||
)
|
||
|
||
# Active compressions
|
||
active_compressions = ""
|
||
for url, progress in _compression_progress.items():
|
||
if progress.get('active', False):
|
||
active_compressions += (
|
||
f"File: {progress.get('filename', 'Unknown')}\n"
|
||
f"Progress: {progress.get('percent', 0):.1f}%\n"
|
||
f"Time Elapsed: {progress.get('elapsed_time', 'N/A')}\n"
|
||
f"Input Size: {progress.get('input_size', 0)} bytes\n"
|
||
f"Current Size: {progress.get('current_size', 0)} bytes\n"
|
||
f"Target Size: {progress.get('target_size', 0)} bytes\n"
|
||
f"Codec: {progress.get('codec', 'Unknown')}\n"
|
||
f"Hardware Accel: {progress.get('hardware_accel', False)}\n"
|
||
f"-------------------\n"
|
||
)
|
||
|
||
if active_compressions:
|
||
embed.add_field(
|
||
name="Active Compressions",
|
||
value=f"```\n{active_compressions}```",
|
||
inline=False
|
||
)
|
||
else:
|
||
embed.add_field(
|
||
name="Active Compressions",
|
||
value="```\nNo active compressions```",
|
||
inline=False
|
||
)
|
||
|
||
# Error statistics
|
||
if queue_status['metrics']['errors_by_type']:
|
||
error_stats = "\n".join(
|
||
f"{error_type}: {count}"
|
||
for error_type, count in queue_status['metrics']['errors_by_type'].items()
|
||
)
|
||
embed.add_field(
|
||
name="Error Statistics",
|
||
value=f"```\n{error_stats}```",
|
||
inline=False
|
||
)
|
||
|
||
# Hardware acceleration statistics
|
||
embed.add_field(
|
||
name="Hardware Statistics",
|
||
value=f"```\n"
|
||
f"Hardware Accel Failures: {queue_status['metrics']['hardware_accel_failures']}\n"
|
||
f"Compression Failures: {queue_status['metrics']['compression_failures']}\n"
|
||
f"Peak Memory Usage: {queue_status['metrics']['peak_memory_usage']:.1f}MB\n"
|
||
f"```",
|
||
inline=False
|
||
)
|
||
|
||
await ctx.send(embed=embed)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error showing queue details: {traceback.format_exc()}")
|
||
await ctx.send(f"Error getting queue details: {str(e)}")
|