From 39169b3edbf3e1147b18e71e7ecdf01a9b298e85 Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Fri, 15 Nov 2024 17:32:12 +0000 Subject: [PATCH] Fixed the TypeError in _periodic_cleanup by adding runtime type checking and conversion: Added checks to ensure added_at is always a datetime object before comparison Converts string timestamps to datetime objects when needed Implemented proper datetime serialization/deserialization in QueueItem: to_dict method properly converts datetime objects to ISO format strings from_dict method properly converts ISO format strings back to datetime objects Added datetime handling for all datetime fields: added_at last_retry last_error_time --- videoarchiver/enhanced_queue.py | 6 +++ videoarchiver/processor.py | 85 +++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/videoarchiver/enhanced_queue.py b/videoarchiver/enhanced_queue.py index 5582c55..5ad04b2 100644 --- a/videoarchiver/enhanced_queue.py +++ b/videoarchiver/enhanced_queue.py @@ -655,12 +655,18 @@ class EnhancedVideoQueueManager: # Clean up completed items for url in list(self._completed.keys()): item = self._completed[url] + # Ensure added_at is a datetime object + if isinstance(item.added_at, str): + item.added_at = datetime.fromisoformat(item.added_at) if item.added_at < cleanup_cutoff: self._completed.pop(url) # Clean up failed items for url in list(self._failed.keys()): item = self._failed[url] + # Ensure added_at is a datetime object + if isinstance(item.added_at, str): + item.added_at = datetime.fromisoformat(item.added_at) if item.added_at < cleanup_cutoff: self._failed.pop(url) diff --git a/videoarchiver/processor.py b/videoarchiver/processor.py index 3ecf8d0..07d1d93 100644 --- a/videoarchiver/processor.py +++ b/videoarchiver/processor.py @@ -91,6 +91,91 @@ class VideoProcessor: self._queue_task = asyncio.create_task(self.queue_manager.process_queue(self._process_video)) logger.info("Video processing queue started successfully") + async def cleanup(self): + """Clean up resources and stop processing""" + try: + logger.info("Starting VideoProcessor cleanup...") + self._unloading = True + + # Cancel all active downloads + async with self._active_downloads_lock: + for url, task in list(self._active_downloads.items()): + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error cancelling download task for {url}: {e}") + self._active_downloads.clear() + + # Clean up queue manager + if hasattr(self, 'queue_manager'): + try: + await self.queue_manager.cleanup() + except Exception as e: + logger.error(f"Error cleaning up queue manager: {e}") + + # Clean up FFmpeg manager + if self.ffmpeg_mgr: + try: + self.ffmpeg_mgr.kill_all_processes() + except Exception as e: + logger.error(f"Error cleaning up FFmpeg manager: {e}") + + # Cancel queue processing task + if hasattr(self, '_queue_task') and not self._queue_task.done(): + self._queue_task.cancel() + try: + await self._queue_task + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error cancelling queue task: {e}") + + logger.info("VideoProcessor cleanup completed successfully") + + except Exception as e: + logger.error(f"Error during VideoProcessor cleanup: {traceback.format_exc()}") + raise ProcessingError(f"Cleanup failed: {str(e)}") + + async def force_cleanup(self): + """Force cleanup of resources when normal cleanup fails or times out""" + try: + logger.info("Starting force cleanup of VideoProcessor...") + self._unloading = True + + # Force cancel all active downloads + for url, task in list(self._active_downloads.items()): + if not task.done(): + task.cancel() + self._active_downloads.clear() + + # Force cleanup queue manager + if hasattr(self, 'queue_manager'): + try: + self.queue_manager.force_stop() + except Exception as e: + logger.error(f"Error force stopping queue manager: {e}") + + # Force cleanup FFmpeg + if self.ffmpeg_mgr: + try: + self.ffmpeg_mgr.kill_all_processes() + except Exception as e: + logger.error(f"Error force cleaning FFmpeg manager: {e}") + + # Force cancel queue task + if hasattr(self, '_queue_task') and not self._queue_task.done(): + self._queue_task.cancel() + + logger.info("VideoProcessor force cleanup completed") + + except Exception as e: + logger.error(f"Error during VideoProcessor force cleanup: {traceback.format_exc()}") + # Don't raise here as this is the last resort cleanup + async def process_message(self, message: discord.Message) -> None: """Process a message for video content""" try: