This commit is contained in:
pacnpal
2024-11-15 16:51:12 +00:00
parent e66af6e844
commit 56b4711661
2 changed files with 62 additions and 161 deletions

View File

@@ -91,142 +91,6 @@ class VideoProcessor:
self._queue_task = asyncio.create_task(self.queue_manager.process_queue(self._process_video)) self._queue_task = asyncio.create_task(self.queue_manager.process_queue(self._process_video))
logger.info("Video processing queue started successfully") logger.info("Video processing queue started successfully")
async def _cancel_active_downloads(self) -> None:
"""Cancel all active downloads and requeue them"""
async with self._active_downloads_lock:
for url, task in self._active_downloads.items():
if not task.done():
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error cancelling download task for {url}: {e}")
# Requeue the download if we're unloading
if self._unloading and url in _download_progress:
try:
# Get the original message details from progress tracking
progress = _download_progress[url]
if progress.get('message_id') and progress.get('channel_id') and progress.get('guild_id'):
await self.queue_manager.add_to_queue(
url=url,
message_id=progress['message_id'],
channel_id=progress['channel_id'],
guild_id=progress['guild_id'],
author_id=progress.get('author_id')
)
logger.info(f"Requeued download for {url}")
except Exception as e:
logger.error(f"Failed to requeue download for {url}: {e}")
self._active_downloads.clear()
async def cleanup(self) -> None:
"""Clean up resources with proper handling"""
try:
self._unloading = True
# Cancel queue processing
if hasattr(self, '_queue_task') and not self._queue_task.done():
self._queue_task.cancel()
try:
await self._queue_task
except asyncio.CancelledError:
pass
# Cancel and requeue active downloads
await self._cancel_active_downloads()
# Clean up progress tracking
_download_progress.clear()
_compression_progress.clear()
except Exception as e:
logger.error(f"Error during processor cleanup: {e}")
raise
finally:
self._unloading = False
async def force_cleanup(self) -> None:
"""Force cleanup of resources when timeout occurs"""
try:
# Cancel all tasks immediately without requeuing
async with self._active_downloads_lock:
for task in self._active_downloads.values():
if not task.done():
task.cancel()
# Cancel queue task
if hasattr(self, '_queue_task') and not self._queue_task.done():
self._queue_task.cancel()
# Clear all tracking
self._active_downloads.clear()
_download_progress.clear()
_compression_progress.clear()
except Exception as e:
logger.error(f"Error during force cleanup: {e}")
async def process_message(self, message: discord.Message) -> None:
"""Process a message for video content"""
try:
if not message.guild or not message.guild.id in self.components:
return
components = self.components[message.guild.id]
downloader = components.get("downloader")
if not downloader:
logger.error(f"No downloader found for guild {message.guild.id}")
return
content = message.content.strip()
if not content or not downloader.is_supported_url(content):
return
try:
await message.add_reaction(REACTIONS['queued'])
logger.info(f"Added queued reaction to message {message.id}")
except Exception as e:
logger.error(f"Failed to add queued reaction: {e}")
# Track message details in progress tracking
_download_progress[content] = {
'active': False,
'message_id': message.id,
'channel_id': message.channel.id,
'guild_id': message.guild.id,
'author_id': message.author.id,
'start_time': None,
'percent': 0,
'speed': 'N/A',
'eta': 'N/A',
'downloaded_bytes': 0,
'total_bytes': 0,
'retries': 0
}
await self.queue_manager.add_to_queue(
url=content,
message_id=message.id,
channel_id=message.channel.id,
guild_id=message.guild.id,
author_id=message.author.id
)
logger.info(f"Added message {message.id} to processing queue")
queue_status = self.queue_manager.get_queue_status(message.guild.id)
queue_position = queue_status['pending'] - 1
await self.update_queue_position_reaction(message, queue_position)
logger.info(f"Message {message.id} is at position {queue_position + 1} in queue")
except Exception as e:
logger.error(f"Error processing message: {traceback.format_exc()}")
raise ProcessingError(f"Failed to process message: {str(e)}")
async def _process_video(self, item) -> Tuple[bool, Optional[str]]: async def _process_video(self, item) -> Tuple[bool, Optional[str]]:
"""Process a video from the queue""" """Process a video from the queue"""
if self._unloading: if self._unloading:
@@ -263,11 +127,24 @@ class VideoProcessor:
logger.error(f"Error fetching original message: {e}") logger.error(f"Error fetching original message: {e}")
original_message = None original_message = None
# Create progress callback that creates tasks directly
def progress_callback(progress: float) -> None:
if original_message:
try:
# Get event loop for the current context
loop = asyncio.get_running_loop()
# Create a task to update the reaction
loop.create_task(
self.update_download_progress_reaction(original_message, progress)
)
except Exception as e:
logger.error(f"Error in progress callback: {e}")
# Create and track download task # Create and track download task
download_task = asyncio.create_task( download_task = asyncio.create_task(
downloader.download_video( downloader.download_video(
item.url, item.url,
progress_callback=lambda progress: self.update_download_progress_reaction(original_message, progress) if original_message else None progress_callback=progress_callback
) )
) )
@@ -369,19 +246,32 @@ class VideoProcessor:
async def update_progress_reaction(self, message, progress): async def update_progress_reaction(self, message, progress):
"""Update progress reaction based on FFmpeg progress""" """Update progress reaction based on FFmpeg progress"""
if not message:
return
try: try:
# Get event loop for the current context
loop = asyncio.get_running_loop()
# Remove old reactions in the event loop
for reaction in REACTIONS['progress']: for reaction in REACTIONS['progress']:
try: try:
await message.remove_reaction(reaction, self.bot.user) await message.remove_reaction(reaction, self.bot.user)
except: except Exception as e:
pass logger.error(f"Failed to remove progress reaction: {e}")
continue
if progress < 33: # Add new reaction based on progress
await message.add_reaction(REACTIONS['progress'][0]) try:
elif progress < 66: if progress < 33:
await message.add_reaction(REACTIONS['progress'][1]) await message.add_reaction(REACTIONS['progress'][0])
else: elif progress < 66:
await message.add_reaction(REACTIONS['progress'][2]) await message.add_reaction(REACTIONS['progress'][1])
else:
await message.add_reaction(REACTIONS['progress'][2])
except Exception as e:
logger.error(f"Failed to add progress reaction: {e}")
except Exception as e: except Exception as e:
logger.error(f"Failed to update progress reaction: {e}") logger.error(f"Failed to update progress reaction: {e}")
@@ -391,24 +281,31 @@ class VideoProcessor:
return return
try: try:
# Remove old reactions in the event loop
for reaction in REACTIONS['download']: for reaction in REACTIONS['download']:
try: try:
await message.remove_reaction(reaction, self.bot.user) await message.remove_reaction(reaction, self.bot.user)
except: except Exception as e:
pass logger.error(f"Failed to remove download reaction: {e}")
continue
if progress <= 20: # Add new reaction based on progress
await message.add_reaction(REACTIONS['download'][0]) try:
elif progress <= 40: if progress <= 20:
await message.add_reaction(REACTIONS['download'][1]) await message.add_reaction(REACTIONS['download'][0])
elif progress <= 60: elif progress <= 40:
await message.add_reaction(REACTIONS['download'][2]) await message.add_reaction(REACTIONS['download'][1])
elif progress <= 80: elif progress <= 60:
await message.add_reaction(REACTIONS['download'][3]) await message.add_reaction(REACTIONS['download'][2])
elif progress < 100: elif progress <= 80:
await message.add_reaction(REACTIONS['download'][4]) await message.add_reaction(REACTIONS['download'][3])
else: elif progress < 100:
await message.add_reaction(REACTIONS['download'][5]) await message.add_reaction(REACTIONS['download'][4])
else:
await message.add_reaction(REACTIONS['download'][5])
except Exception as e:
logger.error(f"Failed to add download reaction: {e}")
except Exception as e: except Exception as e:
logger.error(f"Failed to update download progress reaction: {e}") logger.error(f"Failed to update download progress reaction: {e}")

View File

@@ -540,6 +540,7 @@ class VideoDownloader:
self._active_processes.add(process) self._active_processes.add(process)
start_time = datetime.utcnow() start_time = datetime.utcnow()
loop = asyncio.get_running_loop()
try: try:
while True: while True:
@@ -571,7 +572,9 @@ class VideoDownloader:
}) })
if progress_callback: if progress_callback:
await progress_callback(progress) # Call the callback directly since it now handles task creation
progress_callback(progress)
except Exception as e: except Exception as e:
logger.error(f"Error parsing FFmpeg progress: {e}") logger.error(f"Error parsing FFmpeg progress: {e}")
@@ -714,7 +717,8 @@ class VideoDownloader:
percent = float( percent = float(
d.get("_percent_str", "0").replace("%", "") d.get("_percent_str", "0").replace("%", "")
) )
asyncio.create_task(progress_callback(percent)) # Call the callback directly since it now handles task creation
progress_callback(percent)
except Exception as e: except Exception as e:
logger.error(f"Error in progress callback: {e}") logger.error(f"Error in progress callback: {e}")