""" Health check API views for ThrillWiki API v1. This module contains health check and monitoring endpoints for system status, performance metrics, and database analysis. """ import time from django.utils import timezone from django.conf import settings from rest_framework.views import APIView from rest_framework.request import Request from rest_framework.response import Response from rest_framework.permissions import AllowAny from health_check.views import MainView from drf_spectacular.utils import extend_schema, extend_schema_view # Import serializers from ..serializers import ( HealthCheckOutputSerializer, PerformanceMetricsOutputSerializer, SimpleHealthOutputSerializer, ) # Handle optional dependencies with fallback classes class FallbackCacheMonitor: """Fallback class if CacheMonitor is not available.""" def get_cache_stats(self): return {"error": "Cache monitoring not available"} class FallbackIndexAnalyzer: """Fallback class if IndexAnalyzer is not available.""" @staticmethod def analyze_slow_queries(threshold): return {"error": "Query analysis not available"} # Try to import the real classes, use fallbacks if not available try: from apps.core.services.enhanced_cache_service import CacheMonitor except ImportError: CacheMonitor = FallbackCacheMonitor try: from apps.core.utils.query_optimization import IndexAnalyzer except ImportError: IndexAnalyzer = FallbackIndexAnalyzer @extend_schema_view( get=extend_schema( summary="Health check", description=( "Get comprehensive health check information including system metrics." ), responses={ 200: HealthCheckOutputSerializer, 503: HealthCheckOutputSerializer, }, tags=["Health"], ), ) class HealthCheckAPIView(APIView): """Enhanced API endpoint for health checks with detailed JSON response.""" permission_classes = [AllowAny] serializer_class = HealthCheckOutputSerializer def get(self, request: Request) -> Response: """Return comprehensive health check information.""" start_time = time.time() # Get basic health check results main_view = MainView() main_view.request = request._request # type: ignore[attr-defined] plugins = main_view.plugins errors = main_view.errors # Collect additional performance metrics try: cache_monitor = CacheMonitor() cache_stats = cache_monitor.get_cache_stats() except Exception: cache_stats = {"error": "Cache monitoring unavailable"} # Build comprehensive health data health_data = { "status": "healthy" if not errors else "unhealthy", "timestamp": timezone.now(), "version": getattr(settings, "VERSION", "1.0.0"), "environment": getattr(settings, "ENVIRONMENT", "development"), "response_time_ms": 0, # Will be calculated at the end "checks": {}, "metrics": { "cache": cache_stats, "database": self._get_database_metrics(), "system": self._get_system_metrics(), }, } # Process individual health checks for plugin in plugins: # Handle both plugin objects and strings if hasattr(plugin, "identifier"): plugin_name = plugin.identifier() plugin_class_name = plugin.__class__.__name__ critical_service = getattr(plugin, "critical_service", False) response_time = getattr(plugin, "_response_time", None) else: # If plugin is a string, use it directly plugin_name = str(plugin) plugin_class_name = plugin_name critical_service = False response_time = None plugin_errors = ( errors.get(plugin_class_name, []) if isinstance(errors, dict) else [] ) health_data["checks"][plugin_name] = { "status": "healthy" if not plugin_errors else "unhealthy", "critical": critical_service, "errors": [str(error) for error in plugin_errors], "response_time_ms": response_time, } # Calculate total response time health_data["response_time_ms"] = round((time.time() - start_time) * 1000, 2) # Determine HTTP status code status_code = 200 if errors: # Check if any critical services are failing critical_errors = any( getattr(plugin, "critical_service", False) for plugin in plugins if isinstance(errors, dict) and errors.get(plugin.__class__.__name__) ) status_code = 503 if critical_errors else 200 serializer = HealthCheckOutputSerializer(health_data) return Response(serializer.data, status=status_code) def _get_database_metrics(self) -> dict: """Get database performance metrics.""" try: from django.db import connection from typing import Any # Get basic connection info metrics: dict[str, Any] = { "vendor": connection.vendor, "connection_status": "connected", } # Test query performance start_time = time.time() with connection.cursor() as cursor: cursor.execute("SELECT 1") cursor.fetchone() query_time = (time.time() - start_time) * 1000 metrics["test_query_time_ms"] = round(query_time, 2) # PostgreSQL specific metrics if connection.vendor == "postgresql": try: with connection.cursor() as cursor: cursor.execute( """ SELECT numbackends as active_connections, xact_commit as transactions_committed, xact_rollback as transactions_rolled_back, blks_read as blocks_read, blks_hit as blocks_hit FROM pg_stat_database WHERE datname = current_database() """ ) row = cursor.fetchone() if row: metrics.update( { # type: ignore[arg-type] "active_connections": row[0], "transactions_committed": row[1], "transactions_rolled_back": row[2], "cache_hit_ratio": ( round((row[4] / (row[3] + row[4])) * 100, 2) if (row[3] + row[4]) > 0 else 0 ), } ) except Exception: pass # Skip advanced metrics if not available return metrics except Exception as e: return {"connection_status": "error", "error": str(e)} def _get_system_metrics(self) -> dict: """Get system performance metrics.""" from typing import Any metrics: dict[str, Any] = { "debug_mode": settings.DEBUG, "allowed_hosts": (settings.ALLOWED_HOSTS if settings.DEBUG else ["hidden"]), } try: import psutil # Memory metrics memory = psutil.virtual_memory() metrics["memory"] = { "total_mb": round(memory.total / 1024 / 1024, 2), "available_mb": round(memory.available / 1024 / 1024, 2), "percent_used": memory.percent, } # CPU metrics metrics["cpu"] = { "percent_used": psutil.cpu_percent(interval=0.1), "core_count": psutil.cpu_count(), } # Disk metrics disk = psutil.disk_usage("/") metrics["disk"] = { "total_gb": round(disk.total / 1024 / 1024 / 1024, 2), "free_gb": round(disk.free / 1024 / 1024 / 1024, 2), "percent_used": round((disk.used / disk.total) * 100, 2), } except ImportError: metrics["system_monitoring"] = "psutil not available" except Exception as e: metrics["system_error"] = str(e) return metrics @extend_schema_view( get=extend_schema( summary="Performance metrics", description="Get performance metrics and database analysis (debug mode only).", responses={ 200: PerformanceMetricsOutputSerializer, 403: "Forbidden", }, tags=["Health"], ), ) class PerformanceMetricsAPIView(APIView): """API view for performance metrics and database analysis.""" permission_classes = [AllowAny] if settings.DEBUG else [] serializer_class = PerformanceMetricsOutputSerializer def get(self, request: Request) -> Response: """Return performance metrics and analysis.""" if not settings.DEBUG: return Response({"error": "Only available in debug mode"}, status=403) metrics = { "timestamp": timezone.now(), "database_analysis": self._get_database_analysis(), "cache_performance": self._get_cache_performance(), "recent_slow_queries": self._get_slow_queries(), } serializer = PerformanceMetricsOutputSerializer(metrics) return Response(serializer.data) def _get_database_analysis(self): """Analyze database performance.""" try: from django.db import connection analysis = { "total_queries": len(connection.queries), "query_analysis": IndexAnalyzer.analyze_slow_queries(0.05), } if connection.queries: query_times = [float(q.get("time", 0)) for q in connection.queries] analysis.update( { "total_query_time": sum(query_times), "average_query_time": sum(query_times) / len(query_times), "slowest_query_time": max(query_times), "fastest_query_time": min(query_times), } ) return analysis except Exception as e: return {"error": str(e)} def _get_cache_performance(self): """Get cache performance metrics.""" try: cache_monitor = CacheMonitor() return cache_monitor.get_cache_stats() except Exception as e: return {"error": str(e)} def _get_slow_queries(self): """Get recent slow queries.""" try: return IndexAnalyzer.analyze_slow_queries(0.1) # 100ms threshold except Exception as e: return {"error": str(e)} @extend_schema_view( get=extend_schema( summary="Simple health check", description="Simple health check endpoint for load balancers.", responses={ 200: SimpleHealthOutputSerializer, 503: SimpleHealthOutputSerializer, }, tags=["Health"], ), options=extend_schema( summary="CORS preflight for simple health check", description=( "Handle CORS preflight requests for the simple health check endpoint." ), responses={ 200: SimpleHealthOutputSerializer, }, tags=["Health"], ), ) class SimpleHealthAPIView(APIView): """Simple health check endpoint for load balancers.""" permission_classes = [AllowAny] serializer_class = SimpleHealthOutputSerializer def get(self, request: Request) -> Response: """Return simple OK status.""" try: # Basic database connectivity test from django.db import connection with connection.cursor() as cursor: cursor.execute("SELECT 1") cursor.fetchone() response_data = { "status": "ok", "timestamp": timezone.now(), } serializer = SimpleHealthOutputSerializer(response_data) return Response(serializer.data, status=200) except Exception as e: response_data = { "status": "error", "error": str(e), "timestamp": timezone.now(), } serializer = SimpleHealthOutputSerializer(response_data) return Response(serializer.data, status=503) def options(self, request: Request) -> Response: """Handle OPTIONS requests for CORS preflight.""" response_data = { "status": "ok", "timestamp": timezone.now(), } serializer = SimpleHealthOutputSerializer(response_data) return Response(serializer.data)