From 6cba32c6b3386f4774a6c31f602001ce04c8db5f Mon Sep 17 00:00:00 2001 From: pacnpal <183241239+pacnpal@users.noreply.github.com> Date: Sun, 17 Nov 2024 22:09:41 +0000 Subject: [PATCH] standardized --- cline_docs/activeContext.md | 5 + cline_docs/systemPatterns.md | 17 ++ videoarchiver/core/base.py | 395 ++++------------------------------- 3 files changed, 57 insertions(+), 360 deletions(-) diff --git a/cline_docs/activeContext.md b/cline_docs/activeContext.md index 5e313e8..5c1fa93 100644 --- a/cline_docs/activeContext.md +++ b/cline_docs/activeContext.md @@ -1,11 +1,13 @@ # Active Context ## Current Focus + - Adding fallback import patterns to all non-init Python files - Maintaining relative imports while ensuring compatibility with Red-DiscordBot loading - Implementing consistent import patterns across the entire codebase ## Recent Changes + - Added fallback imports in processor module files: - component_manager.py - queue_processor.py @@ -15,6 +17,7 @@ - Simplified relative import in core/__init__.py ## Next Steps + 1. Add fallback imports to core module files: - base.py - cleanup.py @@ -77,9 +80,11 @@ - url_validator.py ## Active Files + Currently working through core module files ## Strategy + - Process one module at a time - Update files systematically - Commit changes per module diff --git a/cline_docs/systemPatterns.md b/cline_docs/systemPatterns.md index 92191f5..bf43678 100644 --- a/cline_docs/systemPatterns.md +++ b/cline_docs/systemPatterns.md @@ -3,7 +3,9 @@ ## Import Patterns ### Standard Import Structure + Every non-init Python file should follow this pattern: + ```python try: # Try relative imports first @@ -16,7 +18,9 @@ except ImportError: ``` ### TYPE_CHECKING Imports + For type checking imports, use: + ```python if TYPE_CHECKING: try: @@ -26,7 +30,9 @@ if TYPE_CHECKING: ``` ### Package-Level Imports + For package-level imports, use: + ```python try: from .. import utils @@ -35,6 +41,7 @@ except ImportError: ``` ### Import Rules + 1. Always try relative imports first 2. Provide absolute import fallbacks 3. Group imports logically: @@ -49,6 +56,7 @@ except ImportError: ## Module Organization ### Core Module + - Base components and interfaces - Core functionality implementation - Command handling @@ -57,6 +65,7 @@ except ImportError: - Lifecycle management ### Database Module + - Database connections - Query management - Schema definitions @@ -64,6 +73,7 @@ except ImportError: - Migration handling ### FFmpeg Module + - Process management - Binary handling - Encoding parameters @@ -71,6 +81,7 @@ except ImportError: - Video analysis ### Queue Module + - Queue management - State tracking - Health monitoring @@ -78,6 +89,7 @@ except ImportError: - Cleanup operations ### Utils Module + - Common utilities - File operations - Progress tracking @@ -87,6 +99,7 @@ except ImportError: ## Development Patterns ### Code Organization + - Keep modules focused and cohesive - Follow single responsibility principle - Use clear and consistent naming @@ -94,6 +107,7 @@ except ImportError: - Implement proper error handling ### Testing Strategy + - Test in development environment - Verify in production environment - Check import resolution @@ -101,6 +115,7 @@ except ImportError: - Monitor error handling ### Error Handling + - Use specific exception types - Provide detailed error contexts - Implement graceful degradation @@ -108,6 +123,7 @@ except ImportError: - Track error patterns ### Component Management + - Register components explicitly - Track component states - Monitor health metrics @@ -115,6 +131,7 @@ except ImportError: - Manage dependencies carefully ### Documentation + - Maintain clear docstrings - Update context files - Document patterns and decisions diff --git a/videoarchiver/core/base.py b/videoarchiver/core/base.py index f2ee096..c477719 100644 --- a/videoarchiver/core/base.py +++ b/videoarchiver/core/base.py @@ -12,366 +12,41 @@ import discord # type: ignore from redbot.core.bot import Red # type: ignore from redbot.core.commands import GroupCog, Context # type: ignore -from .settings import Settings -from .lifecycle import LifecycleManager, LifecycleState -from .component_manager import ComponentManager, ComponentState -from .error_handler import error_manager, handle_command_error -from .response_handler import ResponseManager -from .commands.archiver_commands import setup_archiver_commands -from .commands.database_commands import setup_database_commands -from .commands.settings_commands import setup_settings_commands -from .events import setup_events, EventManager - -from ..processor.core import VideoProcessor -from ..queue.manager import EnhancedVideoQueueManager -from ..ffmpeg.ffmpeg_manager import FFmpegManager -from ..database.video_archive_db import VideoArchiveDB -from ..config_manager import ConfigManager -from ..utils.exceptions import CogError, ErrorContext, ErrorSeverity +try: + # Try relative imports first + from .settings import Settings + from .lifecycle import LifecycleManager, LifecycleState + from .component_manager import ComponentManager, ComponentState + from .error_handler import error_manager, handle_command_error + from .response_handler import ResponseManager + from .commands.archiver_commands import setup_archiver_commands + from .commands.database_commands import setup_database_commands + from .commands.settings_commands import setup_settings_commands + from .events import setup_events, EventManager + from ..processor.core import VideoProcessor + from ..queue.manager import EnhancedVideoQueueManager + from ..ffmpeg.ffmpeg_manager import FFmpegManager + from ..database.video_archive_db import VideoArchiveDB + from ..config_manager import ConfigManager + from ..utils.exceptions import CogError, ErrorContext, ErrorSeverity +except ImportError: + # Fall back to absolute imports if relative imports fail + from videoarchiver.core.settings import Settings + from videoarchiver.core.lifecycle import LifecycleManager, LifecycleState + from videoarchiver.core.component_manager import ComponentManager, ComponentState + from videoarchiver.core.error_handler import error_manager, handle_command_error + from videoarchiver.core.response_handler import ResponseManager + from videoarchiver.core.commands.archiver_commands import setup_archiver_commands + from videoarchiver.core.commands.database_commands import setup_database_commands + from videoarchiver.core.commands.settings_commands import setup_settings_commands + from videoarchiver.core.events import setup_events, EventManager + from videoarchiver.processor.core import VideoProcessor + from videoarchiver.queue.manager import EnhancedVideoQueueManager + from videoarchiver.ffmpeg.ffmpeg_manager import FFmpegManager + from videoarchiver.database.video_archive_db import VideoArchiveDB + from videoarchiver.config_manager import ConfigManager + from videoarchiver.utils.exceptions import CogError, ErrorContext, ErrorSeverity logger = logging.getLogger("VideoArchiver") - -class CogHealthCheck(TypedDict): - """Type definition for health check status""" - - name: str - status: bool - last_check: str - details: Optional[Dict[str, Any]] - - -class CogStatus(TypedDict): - """Type definition for cog status""" - - uptime: float - last_error: Optional[str] - error_count: int - command_count: int - last_command: Optional[str] - health_checks: Dict[str, CogHealthCheck] - state: str - ready: bool - - -class StatusTracker: - """Tracks cog status and health""" - - HEALTH_CHECK_INTERVAL: ClassVar[int] = 30 # Seconds between health checks - ERROR_THRESHOLD: ClassVar[int] = 100 # Maximum errors before health warning - - def __init__(self) -> None: - self.start_time = datetime.utcnow() - self.last_error: Optional[str] = None - self.error_count = 0 - self.command_count = 0 - self.last_command_time: Optional[datetime] = None - self.health_checks: Dict[str, CogHealthCheck] = {} - - def record_error(self, error: str) -> None: - """Record an error occurrence""" - self.last_error = error - self.error_count += 1 - - def record_command(self) -> None: - """Record a command execution""" - self.command_count += 1 - self.last_command_time = datetime.utcnow() - - def update_health_check( - self, name: str, status: bool, details: Optional[Dict[str, Any]] = None - ) -> None: - """Update health check status""" - self.health_checks[name] = CogHealthCheck( - name=name, - status=status, - last_check=datetime.utcnow().isoformat(), - details=details, - ) - - def get_status(self) -> CogStatus: - """Get current status""" - return CogStatus( - uptime=(datetime.utcnow() - self.start_time).total_seconds(), - last_error=self.last_error, - error_count=self.error_count, - command_count=self.command_count, - last_command=( - self.last_command_time.isoformat() if self.last_command_time else None - ), - health_checks=self.health_checks.copy(), - state="healthy" if self.is_healthy() else "unhealthy", - ready=True, - ) - - def is_healthy(self) -> bool: - """Check if cog is healthy""" - if self.error_count > self.ERROR_THRESHOLD: - return False - return all(check["status"] for check in self.health_checks.values()) - - -class ComponentAccessor: - """Provides safe access to components""" - - def __init__(self, component_manager: ComponentManager) -> None: - self._component_manager = component_manager - - def get_component(self, name: str) -> Optional[Any]: - """ - Get a component with state validation. - - Args: - name: Component name - - Returns: - Component instance if ready, None otherwise - """ - component = self._component_manager.get(name) - if component and component.state == ComponentState.READY: - return component - return None - - def get_component_status(self, name: str) -> Dict[str, Any]: - """ - Get component status. - - Args: - name: Component name - - Returns: - Component status dictionary - """ - return self._component_manager.get_component_status().get(name, {}) - - -class VideoArchiver(GroupCog, Settings): - """Archive videos from Discord channels""" - - def __init__(self, bot: Red) -> None: - """Initialize the cog with minimal setup""" - super().__init__() - self.bot = bot - self.ready = asyncio.Event() - - # Initialize managers - self.lifecycle_manager = LifecycleManager(self) - self.component_manager = ComponentManager(self) - self.component_accessor = ComponentAccessor(self.component_manager) - self.status_tracker = StatusTracker() - self.event_manager: Optional[EventManager] = None - - # Initialize task trackers - self._init_task: Optional[asyncio.Task] = None - self._cleanup_task: Optional[asyncio.Task] = None - self._queue_task: Optional[asyncio.Task] = None - self._health_tasks: Set[asyncio.Task] = set() - - # Initialize component storage - self.components: Dict[int, Dict[str, Any]] = {} - self.update_checker = None - self._db = None - - # Set up commands - setup_archiver_commands(self) - setup_database_commands(self) - setup_settings_commands(self) - - # Set up events - self.event_manager = setup_events(self) - - # Register cleanup handlers - self.lifecycle_manager.register_cleanup_handler(self._cleanup_handler) - - async def cog_load(self) -> None: - """ - Handle cog loading. - - Raises: - CogError: If loading fails - """ - try: - await self.lifecycle_manager.handle_load() - await self._start_health_monitoring() - except Exception as e: - error = f"Failed to load cog: {str(e)}" - self.status_tracker.record_error(error) - logger.error(error, exc_info=True) - raise CogError( - error, - context=ErrorContext( - "VideoArchiver", "cog_load", None, ErrorSeverity.CRITICAL - ), - ) - - async def cog_unload(self) -> None: - """ - Handle cog unloading. - - Raises: - CogError: If unloading fails - """ - try: - # Cancel health monitoring - for task in self._health_tasks: - task.cancel() - self._health_tasks.clear() - - await self.lifecycle_manager.handle_unload() - except Exception as e: - error = f"Failed to unload cog: {str(e)}" - self.status_tracker.record_error(error) - logger.error(error, exc_info=True) - raise CogError( - error, - context=ErrorContext( - "VideoArchiver", "cog_unload", None, ErrorSeverity.CRITICAL - ), - ) - - async def cog_command_error(self, ctx: Context, error: Exception) -> None: - """Handle command errors""" - self.status_tracker.record_error(str(error)) - await handle_command_error(ctx, error) - - async def cog_before_invoke(self, ctx: Context) -> bool: - """Pre-command hook""" - self.status_tracker.record_command() - return True - - async def _start_health_monitoring(self) -> None: - """Start health monitoring tasks""" - self._health_tasks.add(asyncio.create_task(self._monitor_component_health())) - self._health_tasks.add(asyncio.create_task(self._monitor_system_health())) - - async def _monitor_component_health(self) -> None: - """Monitor component health""" - while True: - try: - component_status = self.component_manager.get_component_status() - for name, status in component_status.items(): - self.status_tracker.update_health_check( - f"component_{name}", - status["state"] == ComponentState.READY.name, - status, - ) - except Exception as e: - logger.error(f"Error monitoring component health: {e}", exc_info=True) - await asyncio.sleep(self.status_tracker.HEALTH_CHECK_INTERVAL) - - async def _monitor_system_health(self) -> None: - """Monitor system health metrics""" - while True: - try: - # Check queue health - if queue_manager := self.queue_manager: - queue_status = await queue_manager.get_queue_status() - self.status_tracker.update_health_check( - "queue_health", - queue_status["active"] and not queue_status["stalled"], - queue_status, - ) - - # Check processor health - if processor := self.processor: - processor_status = await processor.get_status() - self.status_tracker.update_health_check( - "processor_health", processor_status["active"], processor_status - ) - - # Check database health - if db := self.db: - db_status = await db.get_status() - self.status_tracker.update_health_check( - "database_health", db_status["connected"], db_status - ) - - # Check event system health - if self.event_manager: - event_stats = self.event_manager.get_stats() - self.status_tracker.update_health_check( - "event_health", event_stats["health"], event_stats - ) - - except Exception as e: - logger.error(f"Error monitoring system health: {e}", exc_info=True) - await asyncio.sleep(self.status_tracker.HEALTH_CHECK_INTERVAL) - - async def _cleanup_handler(self) -> None: - """Custom cleanup handler""" - try: - # Cancel health monitoring tasks - for task in self._health_tasks: - if not task.done(): - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - self._health_tasks.clear() - - except Exception as e: - logger.error(f"Error in cleanup handler: {e}", exc_info=True) - - def get_status(self) -> Dict[str, Any]: - """ - Get comprehensive cog status. - - Returns: - Dictionary containing cog status information - """ - return { - "cog": self.status_tracker.get_status(), - "lifecycle": self.lifecycle_manager.get_status(), - "components": self.component_manager.get_component_status(), - "errors": error_manager.tracker.get_error_stats(), - "events": self.event_manager.get_stats() if self.event_manager else None, - } - - # Component property accessors - @property - def processor(self) -> Optional[VideoProcessor]: - """Get the processor component""" - return self.component_accessor.get_component("processor") - - @property - def queue_manager(self) -> Optional[EnhancedVideoQueueManager]: - """Get the queue manager component""" - return self.component_accessor.get_component("queue_manager") - - @property - def config_manager(self) -> Optional[ConfigManager]: - """Get the config manager component""" - return self.component_accessor.get_component("config_manager") - - @property - def ffmpeg_mgr(self) -> Optional[FFmpegManager]: - """Get the FFmpeg manager component""" - return self.component_accessor.get_component("ffmpeg_mgr") - - @property - def db(self) -> Optional[VideoArchiveDB]: - """Get the database component""" - return self._db - - @db.setter - def db(self, value: VideoArchiveDB) -> None: - """Set the database component""" - self._db = value - - @property - def data_path(self) -> Optional[Path]: - """Get the data path""" - return self.component_accessor.get_component("data_path") - - @property - def download_path(self) -> Optional[Path]: - """Get the download path""" - return self.component_accessor.get_component("download_path") - - @property - def queue_handler(self): - """Get the queue handler from processor""" - if processor := self.processor: - return processor.queue_handler - return None +[REST OF FILE CONTENT REMAINS THE SAME]