From 56b47116616c9848596da1a95af93fce27f71b11 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Fri, 15 Nov 2024 16:51:12 +0000 Subject: [PATCH] fixes --- videoarchiver/processor.py | 215 ++++++------------------ videoarchiver/utils/video_downloader.py | 8 +- 2 files changed, 62 insertions(+), 161 deletions(-) diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index f353fa5..35d8f93 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -91,142 +91,6 @@ class VideoProcessor: self._queue_task = asyncio.create_task(self.queue_manager.process_queue(self._process_video)) logger.info("Video processing queue started successfully") - async def _cancel_active_downloads(self) -> None: - """Cancel all active downloads and requeue them""" - async with self._active_downloads_lock: - for url, task in self._active_downloads.items(): - if not task.done(): - # Cancel the task - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - except Exception as e: - logger.error(f"Error cancelling download task for {url}: {e}") - - # Requeue the download if we're unloading - if self._unloading and url in _download_progress: - try: - # Get the original message details from progress tracking - progress = _download_progress[url] - if progress.get('message_id') and progress.get('channel_id') and progress.get('guild_id'): - await self.queue_manager.add_to_queue( - url=url, - message_id=progress['message_id'], - channel_id=progress['channel_id'], - guild_id=progress['guild_id'], - author_id=progress.get('author_id') - ) - logger.info(f"Requeued download for {url}") - except Exception as e: - logger.error(f"Failed to requeue download for {url}: {e}") - - self._active_downloads.clear() - - async def cleanup(self) -> None: - """Clean up resources with proper handling""" - try: - self._unloading = True - - # Cancel queue processing - if hasattr(self, '_queue_task') and not self._queue_task.done(): - self._queue_task.cancel() - try: - await self._queue_task - except asyncio.CancelledError: - pass - - # Cancel and requeue active downloads - await self._cancel_active_downloads() - - # Clean up progress tracking - _download_progress.clear() - _compression_progress.clear() - - except Exception as e: - logger.error(f"Error during processor cleanup: {e}") - raise - finally: - self._unloading = False - - async def force_cleanup(self) -> None: - """Force cleanup of resources when timeout occurs""" - try: - # Cancel all tasks immediately without requeuing - async with self._active_downloads_lock: - for task in self._active_downloads.values(): - if not task.done(): - task.cancel() - - # Cancel queue task - if hasattr(self, '_queue_task') and not self._queue_task.done(): - self._queue_task.cancel() - - # Clear all tracking - self._active_downloads.clear() - _download_progress.clear() - _compression_progress.clear() - - except Exception as e: - logger.error(f"Error during force cleanup: {e}") - - async def process_message(self, message: discord.Message) -> None: - """Process a message for video content""" - try: - if not message.guild or not message.guild.id in self.components: - return - - components = self.components[message.guild.id] - downloader = components.get("downloader") - if not downloader: - logger.error(f"No downloader found for guild {message.guild.id}") - return - - content = message.content.strip() - if not content or not downloader.is_supported_url(content): - return - - try: - await message.add_reaction(REACTIONS['queued']) - logger.info(f"Added queued reaction to message {message.id}") - except Exception as e: - logger.error(f"Failed to add queued reaction: {e}") - - # Track message details in progress tracking - _download_progress[content] = { - 'active': False, - 'message_id': message.id, - 'channel_id': message.channel.id, - 'guild_id': message.guild.id, - 'author_id': message.author.id, - 'start_time': None, - 'percent': 0, - 'speed': 'N/A', - 'eta': 'N/A', - 'downloaded_bytes': 0, - 'total_bytes': 0, - 'retries': 0 - } - - await self.queue_manager.add_to_queue( - url=content, - message_id=message.id, - channel_id=message.channel.id, - guild_id=message.guild.id, - author_id=message.author.id - ) - logger.info(f"Added message {message.id} to processing queue") - - queue_status = self.queue_manager.get_queue_status(message.guild.id) - queue_position = queue_status['pending'] - 1 - await self.update_queue_position_reaction(message, queue_position) - logger.info(f"Message {message.id} is at position {queue_position + 1} in queue") - - except Exception as e: - logger.error(f"Error processing message: {traceback.format_exc()}") - raise ProcessingError(f"Failed to process message: {str(e)}") - async def _process_video(self, item) -> Tuple[bool, Optional[str]]: """Process a video from the queue""" if self._unloading: @@ -263,11 +127,24 @@ class VideoProcessor: logger.error(f"Error fetching original message: {e}") original_message = None + # Create progress callback that creates tasks directly + def progress_callback(progress: float) -> None: + if original_message: + try: + # Get event loop for the current context + loop = asyncio.get_running_loop() + # Create a task to update the reaction + loop.create_task( + self.update_download_progress_reaction(original_message, progress) + ) + except Exception as e: + logger.error(f"Error in progress callback: {e}") + # Create and track download task download_task = asyncio.create_task( downloader.download_video( item.url, - progress_callback=lambda progress: self.update_download_progress_reaction(original_message, progress) if original_message else None + progress_callback=progress_callback ) ) @@ -369,19 +246,32 @@ class VideoProcessor: async def update_progress_reaction(self, message, progress): """Update progress reaction based on FFmpeg progress""" + if not message: + return + try: + # Get event loop for the current context + loop = asyncio.get_running_loop() + + # Remove old reactions in the event loop for reaction in REACTIONS['progress']: try: await message.remove_reaction(reaction, self.bot.user) - except: - pass + except Exception as e: + logger.error(f"Failed to remove progress reaction: {e}") + continue - if progress < 33: - await message.add_reaction(REACTIONS['progress'][0]) - elif progress < 66: - await message.add_reaction(REACTIONS['progress'][1]) - else: - await message.add_reaction(REACTIONS['progress'][2]) + # Add new reaction based on progress + try: + if progress < 33: + await message.add_reaction(REACTIONS['progress'][0]) + elif progress < 66: + await message.add_reaction(REACTIONS['progress'][1]) + else: + await message.add_reaction(REACTIONS['progress'][2]) + except Exception as e: + logger.error(f"Failed to add progress reaction: {e}") + except Exception as e: logger.error(f"Failed to update progress reaction: {e}") @@ -391,24 +281,31 @@ class VideoProcessor: return try: + # Remove old reactions in the event loop for reaction in REACTIONS['download']: try: await message.remove_reaction(reaction, self.bot.user) - except: - pass + except Exception as e: + logger.error(f"Failed to remove download reaction: {e}") + continue - if progress <= 20: - await message.add_reaction(REACTIONS['download'][0]) - elif progress <= 40: - await message.add_reaction(REACTIONS['download'][1]) - elif progress <= 60: - await message.add_reaction(REACTIONS['download'][2]) - elif progress <= 80: - await message.add_reaction(REACTIONS['download'][3]) - elif progress < 100: - await message.add_reaction(REACTIONS['download'][4]) - else: - await message.add_reaction(REACTIONS['download'][5]) + # Add new reaction based on progress + try: + if progress <= 20: + await message.add_reaction(REACTIONS['download'][0]) + elif progress <= 40: + await message.add_reaction(REACTIONS['download'][1]) + elif progress <= 60: + await message.add_reaction(REACTIONS['download'][2]) + elif progress <= 80: + await message.add_reaction(REACTIONS['download'][3]) + elif progress < 100: + await message.add_reaction(REACTIONS['download'][4]) + else: + await message.add_reaction(REACTIONS['download'][5]) + except Exception as e: + logger.error(f"Failed to add download reaction: {e}") + except Exception as e: logger.error(f"Failed to update download progress reaction: {e}") diff --git a/videoarchiver/utils/video_downloader.py b/videoarchiver/utils/video_downloader.py index d6f0ff3..d1eea88 100644 --- a/videoarchiver/utils/video_downloader.py +++ b/videoarchiver/utils/video_downloader.py @@ -540,6 +540,7 @@ class VideoDownloader: self._active_processes.add(process) start_time = datetime.utcnow() + loop = asyncio.get_running_loop() try: while True: @@ -571,7 +572,9 @@ class VideoDownloader: }) if progress_callback: - await progress_callback(progress) + # Call the callback directly since it now handles task creation + progress_callback(progress) + except Exception as e: logger.error(f"Error parsing FFmpeg progress: {e}") @@ -714,7 +717,8 @@ class VideoDownloader: percent = float( d.get("_percent_str", "0").replace("%", "") ) - asyncio.create_task(progress_callback(percent)) + # Call the callback directly since it now handles task creation + progress_callback(percent) except Exception as e: logger.error(f"Error in progress callback: {e}")