""" Admin API views for dashboard functionality. These views provide endpoints for: - OSM cache statistics - Rate limiting metrics - Database manager operations - Celery task status """ import logging from datetime import timedelta from typing import Any from django.apps import apps from django.contrib.auth import get_user_model from django.core.cache import cache from django.db import transaction from django.db.models import Count, Q from django.utils import timezone from rest_framework import status from apps.core.permissions import IsAdminWithSecondFactor from rest_framework.response import Response from rest_framework.views import APIView from apps.core.utils import capture_and_log logger = logging.getLogger(__name__) User = get_user_model() class OSMUsageStatsView(APIView): """ GET /admin/osm-usage-stats/ Return OSM cache statistics for admin dashboard. """ permission_classes = [IsAdminWithSecondFactor] def get(self, request): """Return OSM/location cache usage statistics.""" try: # Try to get stats from cache first cached_stats = cache.get("osm_usage_stats") if cached_stats: return Response(cached_stats) # Calculate fresh stats now = timezone.now() last_24h = now - timedelta(hours=24) # Get location query cache model if it exists try: LocationQueryCache = apps.get_model("maps", "LocationQueryCache") has_cache_model = True except LookupError: has_cache_model = False if has_cache_model: total_queries = LocationQueryCache.objects.count() recent_queries = LocationQueryCache.objects.filter( created_at__gte=last_24h ).count() cache_hits = LocationQueryCache.objects.filter( access_count__gt=1 ).count() stats = { "timeWindow": "24h", "totalSearches": recent_queries, "cacheHits": cache_hits, "cacheMisses": max(0, recent_queries - cache_hits), "apiCalls": max(0, recent_queries - cache_hits), "errors": 0, "cacheHitRate": ( round(cache_hits / total_queries * 100, 2) if total_queries > 0 else 0 ), "avgResponseTime": 0, # Would need request logging "totalCachedQueries": total_queries, "totalCacheAccesses": cache_hits, "hourlyData": [], "apiCallsSaved": cache_hits, "estimatedCost": { "callsMade": max(0, recent_queries - cache_hits), "callsSaved": cache_hits, "savings": f"${cache_hits * 0.001:.2f}", # Estimated }, } else: # Return empty stats if no cache model stats = { "timeWindow": "24h", "totalSearches": 0, "cacheHits": 0, "cacheMisses": 0, "apiCalls": 0, "errors": 0, "cacheHitRate": 0, "avgResponseTime": 0, "totalCachedQueries": 0, "totalCacheAccesses": 0, "hourlyData": [], "apiCallsSaved": 0, "estimatedCost": { "callsMade": 0, "callsSaved": 0, "savings": "$0.00", }, } # Cache for 5 minutes cache.set("osm_usage_stats", stats, 300) return Response(stats) except Exception as e: capture_and_log(e, "OSM usage stats - error", source="api") return Response( {"detail": "Failed to fetch OSM usage stats"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) class RateLimitMetricsView(APIView): """ POST /admin/rate-limit-metrics/ Return rate limiting metrics for admin dashboard. """ permission_classes = [IsAdminWithSecondFactor] def post(self, request): """Return rate limit metrics based on action.""" try: action = request.data.get("action", "stats") time_window = request.data.get("timeWindow", 60000) # ms limit = request.data.get("limit", 100) # Convert time_window from ms to seconds time_window_seconds = time_window / 1000 if time_window else 60 cutoff = timezone.now() - timedelta(seconds=time_window_seconds) if action == "stats": # Return aggregate statistics # In a real implementation, you'd query a rate limit log table stats = { "totalRequests": 0, "allowedRequests": 0, "blockedRequests": 0, "blockRate": 0, "uniqueIPs": 0, "uniqueUsers": 0, "topBlockedIPs": [], "topBlockedUsers": [], "tierDistribution": { "anonymous": 0, "authenticated": 0, "premium": 0, "admin": 0, }, } return Response(stats) elif action == "recent": # Return recent rate limit events return Response([]) elif action == "function": # Return metrics for a specific function function_name = request.data.get("functionName", "") return Response([]) elif action == "user": # Return metrics for a specific user user_id = request.data.get("userId", "") return Response([]) elif action == "ip": # Return metrics for a specific IP client_ip = request.data.get("clientIP", "") return Response([]) return Response( {"detail": f"Unknown action: {action}"}, status=status.HTTP_400_BAD_REQUEST, ) except Exception as e: capture_and_log(e, "Rate limit metrics - error", source="api") return Response( {"detail": "Failed to fetch rate limit metrics"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) class DatabaseManagerView(APIView): """ POST /admin/database-manager/ Handle admin CRUD operations for entities. """ permission_classes = [IsAdminWithSecondFactor] # Map entity types to Django models ENTITY_MODEL_MAP = { "parks": ("parks", "Park"), "rides": ("rides", "Ride"), "companies": ("companies", "Company"), "reviews": ("reviews", "Review"), "blog_posts": ("blog", "BlogPost"), "photos": ("media", "Photo"), "lists": ("lists", "List"), "profiles": ("accounts", "UserProfile"), } def post(self, request): """Dispatch to appropriate handler based on operation.""" try: operation = request.data.get("operation") entity_type = request.data.get("entityType") entity_id = request.data.get("entityId") data = request.data.get("data", {}) change_reason = request.data.get("changeReason", "Admin operation") if not operation: return Response( {"detail": "operation is required"}, status=status.HTTP_400_BAD_REQUEST, ) if not entity_type: return Response( {"detail": "entityType is required"}, status=status.HTTP_400_BAD_REQUEST, ) # Get the model class model_info = self.ENTITY_MODEL_MAP.get(entity_type) if not model_info: return Response( {"detail": f"Unknown entity type: {entity_type}"}, status=status.HTTP_400_BAD_REQUEST, ) try: Model = apps.get_model(model_info[0], model_info[1]) except LookupError: return Response( {"detail": f"Model not found for {entity_type}"}, status=status.HTTP_400_BAD_REQUEST, ) # Dispatch to handler handlers = { "create": self._handle_create, "update": self._handle_update, "delete": self._handle_delete, "restore": self._handle_restore, "permanent-delete": self._handle_permanent_delete, "bulk-update-status": self._handle_bulk_update_status, "bulk-delete": self._handle_bulk_delete, "bulk-restore": self._handle_bulk_restore, "bulk-permanent-delete": self._handle_bulk_permanent_delete, "get-dependencies": self._handle_get_dependencies, } handler = handlers.get(operation) if not handler: return Response( {"detail": f"Unknown operation: {operation}"}, status=status.HTTP_400_BAD_REQUEST, ) return handler(Model, entity_type, entity_id, data, change_reason, request) except Exception as e: capture_and_log( e, f"Database manager - {operation} error", source="api" ) return Response( {"detail": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) def _handle_create( self, Model, entity_type, entity_id, data, change_reason, request ): """Create a new entity.""" with transaction.atomic(): instance = Model.objects.create(**data) return Response( { "success": True, "data": {"id": str(instance.pk)}, "message": f"{entity_type} created successfully", } ) def _handle_update( self, Model, entity_type, entity_id, data, change_reason, request ): """Update an existing entity.""" if not entity_id: return Response( {"detail": "entityId is required for update"}, status=status.HTTP_400_BAD_REQUEST, ) with transaction.atomic(): try: instance = Model.objects.get(pk=entity_id) except Model.DoesNotExist: return Response( {"detail": f"{entity_type} not found"}, status=status.HTTP_404_NOT_FOUND, ) for key, value in data.items(): if hasattr(instance, key): setattr(instance, key, value) instance.save() return Response( { "success": True, "data": {"id": str(instance.pk)}, "message": f"{entity_type} updated successfully", } ) def _handle_delete( self, Model, entity_type, entity_id, data, change_reason, request ): """Soft delete an entity (set status to deleted).""" if not entity_id: return Response( {"detail": "entityId is required for delete"}, status=status.HTTP_400_BAD_REQUEST, ) with transaction.atomic(): try: instance = Model.objects.get(pk=entity_id) except Model.DoesNotExist: return Response( {"detail": f"{entity_type} not found"}, status=status.HTTP_404_NOT_FOUND, ) # Try soft delete first (set status) if hasattr(instance, "status"): instance.status = "deleted" instance.save() elif hasattr(instance, "is_deleted"): instance.is_deleted = True instance.save() elif hasattr(instance, "deleted_at"): instance.deleted_at = timezone.now() instance.save() else: # Hard delete if no soft delete field instance.delete() return Response( { "success": True, "data": {"id": str(entity_id)}, "message": f"{entity_type} deleted successfully", } ) def _handle_restore( self, Model, entity_type, entity_id, data, change_reason, request ): """Restore a soft-deleted entity.""" if not entity_id: return Response( {"detail": "entityId is required for restore"}, status=status.HTTP_400_BAD_REQUEST, ) new_status = data.get("status", "draft") with transaction.atomic(): try: # Try to get even deleted entities instance = Model.all_objects.get(pk=entity_id) except AttributeError: # Model doesn't have all_objects manager instance = Model.objects.get(pk=entity_id) except Model.DoesNotExist: return Response( {"detail": f"{entity_type} not found"}, status=status.HTTP_404_NOT_FOUND, ) if hasattr(instance, "status"): instance.status = new_status instance.save() elif hasattr(instance, "is_deleted"): instance.is_deleted = False instance.save() elif hasattr(instance, "deleted_at"): instance.deleted_at = None instance.save() return Response( { "success": True, "data": {"id": str(entity_id)}, "message": f"{entity_type} restored successfully", } ) def _handle_permanent_delete( self, Model, entity_type, entity_id, data, change_reason, request ): """Permanently delete an entity.""" if not entity_id: return Response( {"detail": "entityId is required for permanent-delete"}, status=status.HTTP_400_BAD_REQUEST, ) with transaction.atomic(): try: # Try to get even deleted entities try: instance = Model.all_objects.get(pk=entity_id) except AttributeError: instance = Model.objects.get(pk=entity_id) except Model.DoesNotExist: return Response( {"detail": f"{entity_type} not found"}, status=status.HTTP_404_NOT_FOUND, ) instance.delete() return Response( { "success": True, "data": {"id": str(entity_id)}, "message": f"{entity_type} permanently deleted", } ) def _handle_bulk_update_status( self, Model, entity_type, entity_id, data, change_reason, request ): """Bulk update status of multiple entities.""" entity_ids = data.get("entityIds", []) new_status = data.get("status") if not entity_ids: return Response( {"detail": "entityIds is required"}, status=status.HTTP_400_BAD_REQUEST, ) if not new_status: return Response( {"detail": "status is required"}, status=status.HTTP_400_BAD_REQUEST, ) with transaction.atomic(): updated = Model.objects.filter(pk__in=entity_ids).update(status=new_status) return Response( { "success": True, "bulk": { "successCount": updated, "failedCount": len(entity_ids) - updated, }, "message": f"Updated {updated} {entity_type}", } ) def _handle_bulk_delete( self, Model, entity_type, entity_id, data, change_reason, request ): """Bulk soft delete multiple entities.""" entity_ids = data.get("entityIds", []) if not entity_ids: return Response( {"detail": "entityIds is required"}, status=status.HTTP_400_BAD_REQUEST, ) with transaction.atomic(): if hasattr(Model, "status"): updated = Model.objects.filter(pk__in=entity_ids).update( status="deleted" ) else: updated = Model.objects.filter(pk__in=entity_ids).update( is_deleted=True ) return Response( { "success": True, "bulk": { "successCount": updated, "failedCount": len(entity_ids) - updated, }, "message": f"Deleted {updated} {entity_type}", } ) def _handle_bulk_restore( self, Model, entity_type, entity_id, data, change_reason, request ): """Bulk restore soft-deleted entities.""" entity_ids = data.get("entityIds", []) new_status = data.get("status", "draft") if not entity_ids: return Response( {"detail": "entityIds is required"}, status=status.HTTP_400_BAD_REQUEST, ) with transaction.atomic(): try: updated = Model.all_objects.filter(pk__in=entity_ids).update( status=new_status ) except AttributeError: updated = Model.objects.filter(pk__in=entity_ids).update( status=new_status ) return Response( { "success": True, "bulk": { "successCount": updated, "failedCount": len(entity_ids) - updated, }, "message": f"Restored {updated} {entity_type}", } ) def _handle_bulk_permanent_delete( self, Model, entity_type, entity_id, data, change_reason, request ): """Bulk permanently delete entities.""" entity_ids = data.get("entityIds", []) if not entity_ids: return Response( {"detail": "entityIds is required"}, status=status.HTTP_400_BAD_REQUEST, ) with transaction.atomic(): try: deleted, _ = Model.all_objects.filter(pk__in=entity_ids).delete() except AttributeError: deleted, _ = Model.objects.filter(pk__in=entity_ids).delete() return Response( { "success": True, "bulk": { "successCount": deleted, "failedCount": len(entity_ids) - deleted, }, "message": f"Permanently deleted {deleted} {entity_type}", } ) def _handle_get_dependencies( self, Model, entity_type, entity_id, data, change_reason, request ): """Get dependencies for an entity before deletion.""" if not entity_id: return Response( {"detail": "entityId is required"}, status=status.HTTP_400_BAD_REQUEST, ) try: instance = Model.objects.get(pk=entity_id) except Model.DoesNotExist: return Response( {"detail": f"{entity_type} not found"}, status=status.HTTP_404_NOT_FOUND, ) # Get related objects count dependencies = [] for rel in instance._meta.get_fields(): if rel.one_to_many or rel.one_to_one or rel.many_to_many: try: related_name = rel.get_accessor_name() related_manager = getattr(instance, related_name, None) if related_manager and hasattr(related_manager, "count"): count = related_manager.count() if count > 0: dependencies.append( { "type": rel.related_model._meta.verbose_name_plural, "count": count, } ) except Exception: pass return Response( { "success": True, "dependencies": dependencies, "hasDependencies": len(dependencies) > 0, } ) class CeleryTaskStatusView(APIView): """ GET /admin/tasks/status/ Return Celery task status (read-only). """ permission_classes = [IsAdminWithSecondFactor] # List of known scheduled tasks SCHEDULED_TASKS = [ { "name": "process_scheduled_deletions", "display_name": "Process Scheduled Deletions", "schedule": "daily at midnight", }, { "name": "process_closing_entities", "display_name": "Process Closing Entities", "schedule": "daily at midnight", }, { "name": "process_expired_bans", "display_name": "Process Expired Bans", "schedule": "every 15 minutes", }, { "name": "cleanup_orphaned_images", "display_name": "Cleanup Orphaned Images", "schedule": "weekly on Sunday", }, { "name": "cleanup_old_versions", "display_name": "Cleanup Old Versions", "schedule": "weekly on Sunday", }, { "name": "data_retention_cleanup", "display_name": "Data Retention Cleanup", "schedule": "daily at 3 AM", }, ] def get(self, request): """Return status of all scheduled tasks.""" try: task_name = request.query_params.get("task") tasks_status = [] for task_info in self.SCHEDULED_TASKS: # Get last run info from cache cache_key = f"task_last_run_{task_info['name']}" last_run_info = cache.get(cache_key, {}) task_status = { "name": task_info["name"], "displayName": task_info["display_name"], "schedule": task_info["schedule"], "lastRun": last_run_info.get("timestamp"), "lastResult": last_run_info.get("result", "unknown"), "lastDuration": last_run_info.get("duration"), "status": "scheduled", } if task_name and task_name == task_info["name"]: return Response(task_status) tasks_status.append(task_status) if task_name: return Response( {"detail": f"Unknown task: {task_name}"}, status=status.HTTP_404_NOT_FOUND, ) return Response( { "tasks": tasks_status, "totalTasks": len(tasks_status), } ) except Exception as e: capture_and_log(e, "Celery task status - error", source="api") return Response( {"detail": "Failed to fetch task status"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) class DetectAnomaliesView(APIView): """ POST /admin/anomalies/detect/ Detect data anomalies for admin dashboard. Full parity with Supabase Edge Function: detect-anomalies The original Edge Function implements 7 ML detection algorithms: 1. Z-Score (standard deviation outliers) 2. Moving Average (trend deviation) 3. Rate of Change (sudden changes) 4. Isolation Forest (ML-based outlier detection) 5. Seasonal Decomposition (periodic pattern anomalies) 6. Predictive (Holt-Winters exponential smoothing) 7. Ensemble (combines all algorithms) This implementation provides: - Config-driven detection framework - Data quality checks (orphaned records, duplicates, incomplete data) - Auto-alerting for critical anomalies TODO: Implement full ML algorithms with numpy/scipy in follow-up task. """ permission_classes = [IsAdminWithSecondFactor] # Severity score thresholds SEVERITY_THRESHOLDS = { "critical": 4.0, "high": 3.0, "medium": 2.0, "low": 1.0, } def post(self, request): """Detect and return data anomalies.""" try: # ================================================================ # Input validation with safe type handling # ================================================================ raw_type = request.data.get("type", "all") anomaly_type = raw_type if isinstance(raw_type, str) else "all" anomaly_type = anomaly_type.strip().lower()[:50] # Validate anomaly_type against allowed values allowed_types = {"all", "orphaned", "duplicates", "incomplete", "data_quality"} if anomaly_type not in allowed_types: anomaly_type = "all" # Safe sensitivity parsing with bounds try: sensitivity = float(request.data.get("sensitivity", 2.5)) sensitivity = max(0.1, min(sensitivity, 10.0)) # Clamp to [0.1, 10.0] except (ValueError, TypeError): sensitivity = 2.5 # Safe lookback_minutes parsing with bounds try: lookback_minutes = int(request.data.get("lookback_minutes", 60)) lookback_minutes = max(1, min(lookback_minutes, 10080)) # 1 min to 1 week except (ValueError, TypeError): lookback_minutes = 60 anomalies = [] # ================================================================ # Data Quality Anomalies (immediate checks) # ================================================================ # Check for orphaned records if anomaly_type in ["all", "orphaned", "data_quality"]: try: Park = apps.get_model("parks", "Park") Ride = apps.get_model("rides", "Ride") # Rides without parks orphaned_rides = Ride.objects.filter(park__isnull=True).count() if orphaned_rides > 0: severity = "high" if orphaned_rides > 10 else "medium" anomalies.append({ "id": f"orphaned_rides_{timezone.now().timestamp()}", "metric_name": "orphaned_records", "metric_category": "data_quality", "anomaly_type": "orphaned_rides", "severity": severity, "baseline_value": 0, "anomaly_value": orphaned_rides, "deviation_score": orphaned_rides / 5.0, # Score based on count "confidence_score": 1.0, # 100% confidence for exact counts "detection_algorithm": "rule_based", "description": f"{orphaned_rides} rides without associated park", "detected_at": timezone.now().isoformat(), }) except LookupError: pass # Check for duplicate slugs if anomaly_type in ["all", "duplicates", "data_quality"]: try: Park = apps.get_model("parks", "Park") duplicate_slugs = ( Park.objects.values("slug") .annotate(count=Count("id")) .filter(count__gt=1) ) dup_count = duplicate_slugs.count() if dup_count > 0: anomalies.append({ "id": f"duplicate_slugs_{timezone.now().timestamp()}", "metric_name": "duplicate_slugs", "metric_category": "data_quality", "anomaly_type": "duplicate_values", "severity": "high" if dup_count > 5 else "medium", "baseline_value": 0, "anomaly_value": dup_count, "deviation_score": dup_count / 2.0, "confidence_score": 1.0, "detection_algorithm": "rule_based", "description": f"{dup_count} duplicate park slugs detected", "detected_at": timezone.now().isoformat(), }) except LookupError: pass # Check for missing required fields if anomaly_type in ["all", "incomplete", "data_quality"]: try: Park = apps.get_model("parks", "Park") parks_no_location = Park.objects.filter( Q(latitude__isnull=True) | Q(longitude__isnull=True) ).count() if parks_no_location > 0: anomalies.append({ "id": f"incomplete_parks_{timezone.now().timestamp()}", "metric_name": "incomplete_data", "metric_category": "data_quality", "anomaly_type": "missing_required_fields", "severity": "low" if parks_no_location < 10 else "medium", "baseline_value": 0, "anomaly_value": parks_no_location, "deviation_score": parks_no_location / 10.0, "confidence_score": 1.0, "detection_algorithm": "rule_based", "description": f"{parks_no_location} parks missing location data", "detected_at": timezone.now().isoformat(), }) except LookupError: pass # ================================================================ # TODO: Implement ML-based anomaly detection # ================================================================ # The original Supabase Edge Function reads from: # - anomaly_detection_config table (enabled metrics, sensitivity, algorithms) # - metric_time_series table (historical metric data) # # Then applies these algorithms: # 1. z_score: (value - mean) / std_dev # 2. moving_average: deviation from rolling average # 3. rate_of_change: delta compared to previous values # 4. isolation_forest: sklearn-style outlier detection # 5. seasonal_decomposition: detect periodic pattern breaks # 6. predictive: Holt-Winters forecasting comparison # 7. ensemble: weighted combination of all methods # # For now, we return data quality anomalies. Full ML implementation # requires numpy/scipy dependencies and metric_time_series data. # ================================================================ # Auto-create alerts for critical/high severity # ================================================================ for anomaly in anomalies: if anomaly.get("severity") in ["critical", "high"]: # Log critical anomaly (would create SystemAlert in full impl) logger.warning( f"Critical anomaly detected: {anomaly.get('description')}", extra={ "anomaly_type": anomaly.get("anomaly_type"), "severity": anomaly.get("severity"), "deviation_score": anomaly.get("deviation_score"), } ) # Calculate summary counts detected_count = len(anomalies) critical_count = sum(1 for a in anomalies if a.get("severity") in ["critical", "high"]) return Response({ "success": True, "detected_count": detected_count, "critical_count": critical_count, "anomalies": anomalies, "scanned_at": timezone.now().isoformat(), "config": { "sensitivity": sensitivity, "lookback_minutes": lookback_minutes, "algorithms_available": [ "rule_based", # TODO: Add when implemented # "z_score", "moving_average", "rate_of_change", # "isolation_forest", "seasonal", "predictive", "ensemble" ], }, }) except Exception as e: capture_and_log(e, "Detect anomalies - error", source="api") return Response( {"detail": "Failed to detect anomalies"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) class CollectMetricsView(APIView): """ POST /admin/metrics/collect/ Collect system metrics for admin dashboard. BULLETPROOFED: Safe input parsing with validation. """ permission_classes = [IsAdminWithSecondFactor] # Allowed values ALLOWED_METRIC_TYPES = {"all", "database", "users", "moderation", "performance"} ALLOWED_TIME_RANGES = {"24h", "7d", "30d", "1h", "12h"} def post(self, request): """Collect and return system metrics.""" try: # ================================================================ # Input validation with safe type handling # ================================================================ raw_type = request.data.get("type", "all") metric_type = raw_type if isinstance(raw_type, str) else "all" metric_type = metric_type.strip().lower()[:50] if metric_type not in self.ALLOWED_METRIC_TYPES: metric_type = "all" raw_time_range = request.data.get("timeRange", "24h") time_range = raw_time_range if isinstance(raw_time_range, str) else "24h" time_range = time_range.strip().lower()[:10] if time_range not in self.ALLOWED_TIME_RANGES: time_range = "24h" # Parse time range to cutoff time_range_map = { "1h": timedelta(hours=1), "12h": timedelta(hours=12), "24h": timedelta(hours=24), "7d": timedelta(days=7), "30d": timedelta(days=30), } cutoff = timezone.now() - time_range_map.get(time_range, timedelta(hours=24)) metrics = { "collectedAt": timezone.now().isoformat(), "timeRange": time_range, } # Database metrics if metric_type in ["all", "database"]: try: Park = apps.get_model("parks", "Park") Ride = apps.get_model("rides", "Ride") metrics["database"] = { "totalParks": Park.objects.count(), "totalRides": Ride.objects.count(), "recentParks": Park.objects.filter(created_at__gte=cutoff).count(), "recentRides": Ride.objects.filter(created_at__gte=cutoff).count(), } except (LookupError, Exception): metrics["database"] = { "error": "Could not fetch database metrics" } # User metrics if metric_type in ["all", "users"]: try: metrics["users"] = { "totalUsers": User.objects.count(), "activeUsers": User.objects.filter( last_login__gte=cutoff ).count(), "newUsers": User.objects.filter( date_joined__gte=cutoff ).count(), } except Exception: metrics["users"] = { "error": "Could not fetch user metrics" } # Moderation metrics if metric_type in ["all", "moderation"]: try: EditSubmission = apps.get_model("moderation", "EditSubmission") metrics["moderation"] = { "pendingSubmissions": EditSubmission.objects.filter( status="PENDING" ).count(), "recentSubmissions": EditSubmission.objects.filter( created_at__gte=cutoff ).count(), } except (LookupError, Exception): metrics["moderation"] = { "error": "Could not fetch moderation metrics" } return Response({ "success": True, "metrics": metrics, }) except Exception as e: capture_and_log(e, "Collect metrics - error", source="api") return Response( {"detail": "Failed to collect metrics"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) class PipelineIntegrityScanView(APIView): """ POST /admin/pipeline/integrity-scan/ Scan data pipeline for integrity issues. BULLETPROOFED: Safe input parsing with validation. """ permission_classes = [IsAdminWithSecondFactor] # Allowed values ALLOWED_SCAN_TYPES = {"full", "referential", "status", "media", "submissions", "stuck", "versions"} MAX_HOURS_BACK = 720 # 30 days max def post(self, request): """Run integrity scan on data pipeline.""" try: # ================================================================ # Input validation with safe type handling # ================================================================ # Safe hours_back parsing with bounds try: hours_back = int(request.data.get("hours_back", 48)) hours_back = max(1, min(hours_back, self.MAX_HOURS_BACK)) except (ValueError, TypeError): hours_back = 48 # Safe scan_type validation raw_type = request.data.get("type", "full") scan_type = raw_type if isinstance(raw_type, str) else "full" scan_type = scan_type.strip().lower()[:50] if scan_type not in self.ALLOWED_SCAN_TYPES: scan_type = "full" # Safe fix_issues parsing (boolean) raw_fix = request.data.get("fix", False) fix_issues = raw_fix is True or str(raw_fix).lower() in ("true", "1", "yes") # Calculate cutoff time based on hours_back cutoff_time = timezone.now() - timedelta(hours=hours_back) issues = [] fixed_count = 0 # Check for referential integrity if scan_type in ["full", "referential"]: try: Ride = apps.get_model("rides", "Ride") Park = apps.get_model("parks", "Park") # Rides pointing to non-existent parks valid_park_ids = Park.objects.values_list("id", flat=True) invalid_rides = Ride.objects.exclude( park_id__in=valid_park_ids ).exclude(park_id__isnull=True) if invalid_rides.exists(): for ride in invalid_rides[:10]: # Limit to 10 examples issues.append({ "issue_type": "broken_reference", "entity_type": "ride", "entity_id": str(ride.id), "submission_id": "", "severity": "critical", "description": f"Ride '{ride.name}' has invalid park reference", "detected_at": timezone.now().isoformat(), }) if fix_issues: # Set invalid park references to null invalid_rides.update(park_id=None) fixed_count += invalid_rides.count() except LookupError: pass # Check for status consistency if scan_type in ["full", "status"]: try: Park = apps.get_model("parks", "Park") # Parks with invalid status values valid_statuses = ["operating", "closed", "under_construction", "announced", "deleted"] invalid_status_parks = Park.objects.exclude(status__in=valid_statuses) for park in invalid_status_parks[:10]: # Limit to 10 examples issues.append({ "issue_type": "invalid_status", "entity_type": "park", "entity_id": str(park.id), "submission_id": "", "severity": "warning", "description": f"Park '{park.name}' has invalid status: {park.status}", "detected_at": timezone.now().isoformat(), }) except LookupError: pass # Check for orphaned media if scan_type in ["full", "media"]: try: Photo = apps.get_model("media", "Photo") orphaned_photos = Photo.objects.filter( entity_id__isnull=True, entity_type__isnull=True, ) for photo in orphaned_photos[:10]: # Limit to 10 examples issues.append({ "issue_type": "orphaned_media", "entity_type": "photo", "entity_id": str(photo.id), "submission_id": "", "severity": "info", "description": "Photo without associated entity", "detected_at": timezone.now().isoformat(), }) except LookupError: pass # ================================================================ # Check 3: Stuck submissions with expired locks (from original) # ================================================================ if scan_type in ["full", "submissions", "stuck"]: try: EditSubmission = apps.get_model("moderation", "EditSubmission") # Find submissions that are claimed but claim has expired # Assuming a claim expires after 30 minutes of inactivity claim_expiry = timezone.now() - timedelta(minutes=30) stuck_submissions = EditSubmission.objects.filter( status__in=["CLAIMED", "claimed", "reviewing"], claimed_at__lt=claim_expiry, ).exclude(claimed_at__isnull=True) for sub in stuck_submissions[:10]: # Limit to 10 examples hours_stuck = (timezone.now() - sub.claimed_at).total_seconds() / 3600 issues.append({ "issue_type": "stuck_submission", "entity_type": "edit_submission", "entity_id": str(sub.id), "submission_id": str(sub.id), "severity": "warning" if hours_stuck < 4 else "critical", "description": ( f"Submission claimed by {sub.claimed_by.username if sub.claimed_by else 'unknown'} " f"but stuck for {hours_stuck:.1f} hours" ), "detected_at": timezone.now().isoformat(), "metadata": { "claimed_at": sub.claimed_at.isoformat() if sub.claimed_at else None, "claimed_by": sub.claimed_by.username if sub.claimed_by else None, "hours_stuck": round(hours_stuck, 1), }, }) if fix_issues: # Unclaim stuck submissions sub.claimed_by = None sub.claimed_at = None sub.status = "PENDING" sub.save(update_fields=["claimed_by", "claimed_at", "status"]) fixed_count += 1 except LookupError: pass # ================================================================ # Check: Entities with approvals but no version records (from original) # Uses pghistory events as proxy for version records # ================================================================ if scan_type in ["full", "versions"]: try: # Check if pghistory events exist for recently approved submissions EditSubmission = apps.get_model("moderation", "EditSubmission") recently_approved = EditSubmission.objects.filter( status__in=["APPROVED", "approved"], handled_at__gte=cutoff_time, ) for sub in recently_approved[:10]: # Check if the target object has history target = sub.content_object if target and hasattr(target, 'events'): try: event_count = target.events.count() if event_count == 0: issues.append({ "issue_type": "missing_version_record", "entity_type": sub.content_type.model, "entity_id": str(sub.object_id), "submission_id": str(sub.id), "severity": "critical", "description": f"Approved {sub.content_type.model} has no version history", "detected_at": timezone.now().isoformat(), }) except Exception: pass except LookupError: pass # Calculate summary counts critical_count = sum(1 for i in issues if i.get("severity") == "critical") warning_count = sum(1 for i in issues if i.get("severity") == "warning") info_count = sum(1 for i in issues if i.get("severity") == "info") # Return in frontend-expected format return Response({ "success": True, "scan_timestamp": timezone.now().isoformat(), "hours_scanned": hours_back, "issues_found": len(issues), "issues": issues, "summary": { "critical": critical_count, "warning": warning_count, "info": info_count, }, }) except Exception as e: capture_and_log(e, "Pipeline integrity scan - error", source="api") return Response( {"detail": "Failed to run integrity scan"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) class AdminSettingsView(APIView): """ GET/POST /admin/settings/ Simple key-value store for admin preferences. Settings are stored in Django cache with admin-specific keys. For persistent storage, a database model can be added later. """ permission_classes = [IsAdminWithSecondFactor] def get(self, request): """Get all admin settings or a specific setting.""" try: key = request.query_params.get("key") if key: # Get specific setting value = cache.get(f"admin_setting_{key}") if value is None: return Response( {"results": []}, status=status.HTTP_200_OK, ) return Response( {"results": [{"key": key, "value": value}]}, status=status.HTTP_200_OK, ) # Get all settings (return empty list if none exist) # In a real implementation, you'd query a database model settings_keys = cache.get("admin_settings_keys", []) results = [] for k in settings_keys: val = cache.get(f"admin_setting_{k}") if val is not None: results.append({"key": k, "value": val}) return Response( {"results": results, "count": len(results)}, status=status.HTTP_200_OK, ) except Exception as e: capture_and_log(e, "Admin settings GET - error", source="api") return Response( {"detail": "Failed to fetch admin settings"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) def post(self, request): """Create or update an admin setting.""" try: key = request.data.get("key") value = request.data.get("value") if not key: return Response( {"detail": "key is required"}, status=status.HTTP_400_BAD_REQUEST, ) # Store in cache (30 days TTL) cache.set(f"admin_setting_{key}", value, 60 * 60 * 24 * 30) # Track keys settings_keys = cache.get("admin_settings_keys", []) if key not in settings_keys: settings_keys.append(key) cache.set("admin_settings_keys", settings_keys, 60 * 60 * 24 * 30) return Response( {"success": True, "key": key, "value": value}, status=status.HTTP_200_OK, ) except Exception as e: capture_and_log(e, "Admin settings POST - error", source="api") return Response( {"detail": "Failed to save admin setting"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, )