mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2026-02-05 11:45:18 -05:00
1351 lines
52 KiB
Python
1351 lines
52 KiB
Python
"""
|
|
Admin API views for dashboard functionality.
|
|
|
|
These views provide endpoints for:
|
|
- OSM cache statistics
|
|
- Rate limiting metrics
|
|
- Database manager operations
|
|
- Celery task status
|
|
"""
|
|
|
|
import logging
|
|
from datetime import timedelta
|
|
from typing import Any
|
|
|
|
from django.apps import apps
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.cache import cache
|
|
from django.db import transaction
|
|
from django.db.models import Count, Q
|
|
from django.utils import timezone
|
|
from rest_framework import status
|
|
from apps.core.permissions import IsAdminWithSecondFactor
|
|
from rest_framework.response import Response
|
|
from rest_framework.views import APIView
|
|
|
|
from apps.core.utils import capture_and_log
|
|
|
|
logger = logging.getLogger(__name__)
|
|
User = get_user_model()
|
|
|
|
|
|
class OSMUsageStatsView(APIView):
|
|
"""
|
|
GET /admin/osm-usage-stats/
|
|
Return OSM cache statistics for admin dashboard.
|
|
"""
|
|
|
|
permission_classes = [IsAdminWithSecondFactor]
|
|
|
|
def get(self, request):
|
|
"""Return OSM/location cache usage statistics."""
|
|
try:
|
|
# Try to get stats from cache first
|
|
cached_stats = cache.get("osm_usage_stats")
|
|
if cached_stats:
|
|
return Response(cached_stats)
|
|
|
|
# Calculate fresh stats
|
|
now = timezone.now()
|
|
last_24h = now - timedelta(hours=24)
|
|
|
|
# Get location query cache model if it exists
|
|
try:
|
|
LocationQueryCache = apps.get_model("maps", "LocationQueryCache")
|
|
has_cache_model = True
|
|
except LookupError:
|
|
has_cache_model = False
|
|
|
|
if has_cache_model:
|
|
total_queries = LocationQueryCache.objects.count()
|
|
recent_queries = LocationQueryCache.objects.filter(
|
|
created_at__gte=last_24h
|
|
).count()
|
|
cache_hits = LocationQueryCache.objects.filter(
|
|
access_count__gt=1
|
|
).count()
|
|
|
|
stats = {
|
|
"timeWindow": "24h",
|
|
"totalSearches": recent_queries,
|
|
"cacheHits": cache_hits,
|
|
"cacheMisses": max(0, recent_queries - cache_hits),
|
|
"apiCalls": max(0, recent_queries - cache_hits),
|
|
"errors": 0,
|
|
"cacheHitRate": (
|
|
round(cache_hits / total_queries * 100, 2)
|
|
if total_queries > 0
|
|
else 0
|
|
),
|
|
"avgResponseTime": 0, # Would need request logging
|
|
"totalCachedQueries": total_queries,
|
|
"totalCacheAccesses": cache_hits,
|
|
"hourlyData": [],
|
|
"apiCallsSaved": cache_hits,
|
|
"estimatedCost": {
|
|
"callsMade": max(0, recent_queries - cache_hits),
|
|
"callsSaved": cache_hits,
|
|
"savings": f"${cache_hits * 0.001:.2f}", # Estimated
|
|
},
|
|
}
|
|
else:
|
|
# Return empty stats if no cache model
|
|
stats = {
|
|
"timeWindow": "24h",
|
|
"totalSearches": 0,
|
|
"cacheHits": 0,
|
|
"cacheMisses": 0,
|
|
"apiCalls": 0,
|
|
"errors": 0,
|
|
"cacheHitRate": 0,
|
|
"avgResponseTime": 0,
|
|
"totalCachedQueries": 0,
|
|
"totalCacheAccesses": 0,
|
|
"hourlyData": [],
|
|
"apiCallsSaved": 0,
|
|
"estimatedCost": {
|
|
"callsMade": 0,
|
|
"callsSaved": 0,
|
|
"savings": "$0.00",
|
|
},
|
|
}
|
|
|
|
# Cache for 5 minutes
|
|
cache.set("osm_usage_stats", stats, 300)
|
|
return Response(stats)
|
|
|
|
except Exception as e:
|
|
capture_and_log(e, "OSM usage stats - error", source="api")
|
|
return Response(
|
|
{"detail": "Failed to fetch OSM usage stats"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
class RateLimitMetricsView(APIView):
|
|
"""
|
|
POST /admin/rate-limit-metrics/
|
|
Return rate limiting metrics for admin dashboard.
|
|
"""
|
|
|
|
permission_classes = [IsAdminWithSecondFactor]
|
|
|
|
def post(self, request):
|
|
"""Return rate limit metrics based on action."""
|
|
try:
|
|
action = request.data.get("action", "stats")
|
|
time_window = request.data.get("timeWindow", 60000) # ms
|
|
limit = request.data.get("limit", 100)
|
|
|
|
# Convert time_window from ms to seconds
|
|
time_window_seconds = time_window / 1000 if time_window else 60
|
|
cutoff = timezone.now() - timedelta(seconds=time_window_seconds)
|
|
|
|
if action == "stats":
|
|
# Return aggregate statistics
|
|
# In a real implementation, you'd query a rate limit log table
|
|
stats = {
|
|
"totalRequests": 0,
|
|
"allowedRequests": 0,
|
|
"blockedRequests": 0,
|
|
"blockRate": 0,
|
|
"uniqueIPs": 0,
|
|
"uniqueUsers": 0,
|
|
"topBlockedIPs": [],
|
|
"topBlockedUsers": [],
|
|
"tierDistribution": {
|
|
"anonymous": 0,
|
|
"authenticated": 0,
|
|
"premium": 0,
|
|
"admin": 0,
|
|
},
|
|
}
|
|
return Response(stats)
|
|
|
|
elif action == "recent":
|
|
# Return recent rate limit events
|
|
return Response([])
|
|
|
|
elif action == "function":
|
|
# Return metrics for a specific function
|
|
function_name = request.data.get("functionName", "")
|
|
return Response([])
|
|
|
|
elif action == "user":
|
|
# Return metrics for a specific user
|
|
user_id = request.data.get("userId", "")
|
|
return Response([])
|
|
|
|
elif action == "ip":
|
|
# Return metrics for a specific IP
|
|
client_ip = request.data.get("clientIP", "")
|
|
return Response([])
|
|
|
|
return Response(
|
|
{"detail": f"Unknown action: {action}"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
except Exception as e:
|
|
capture_and_log(e, "Rate limit metrics - error", source="api")
|
|
return Response(
|
|
{"detail": "Failed to fetch rate limit metrics"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
class DatabaseManagerView(APIView):
|
|
"""
|
|
POST /admin/database-manager/
|
|
Handle admin CRUD operations for entities.
|
|
"""
|
|
|
|
permission_classes = [IsAdminWithSecondFactor]
|
|
|
|
# Map entity types to Django models
|
|
ENTITY_MODEL_MAP = {
|
|
"parks": ("parks", "Park"),
|
|
"rides": ("rides", "Ride"),
|
|
"companies": ("companies", "Company"),
|
|
"reviews": ("reviews", "Review"),
|
|
"blog_posts": ("blog", "BlogPost"),
|
|
"photos": ("media", "Photo"),
|
|
"lists": ("lists", "List"),
|
|
"profiles": ("accounts", "UserProfile"),
|
|
}
|
|
|
|
def post(self, request):
|
|
"""Dispatch to appropriate handler based on operation."""
|
|
try:
|
|
operation = request.data.get("operation")
|
|
entity_type = request.data.get("entityType")
|
|
entity_id = request.data.get("entityId")
|
|
data = request.data.get("data", {})
|
|
change_reason = request.data.get("changeReason", "Admin operation")
|
|
|
|
if not operation:
|
|
return Response(
|
|
{"detail": "operation is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
if not entity_type:
|
|
return Response(
|
|
{"detail": "entityType is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Get the model class
|
|
model_info = self.ENTITY_MODEL_MAP.get(entity_type)
|
|
if not model_info:
|
|
return Response(
|
|
{"detail": f"Unknown entity type: {entity_type}"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
try:
|
|
Model = apps.get_model(model_info[0], model_info[1])
|
|
except LookupError:
|
|
return Response(
|
|
{"detail": f"Model not found for {entity_type}"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Dispatch to handler
|
|
handlers = {
|
|
"create": self._handle_create,
|
|
"update": self._handle_update,
|
|
"delete": self._handle_delete,
|
|
"restore": self._handle_restore,
|
|
"permanent-delete": self._handle_permanent_delete,
|
|
"bulk-update-status": self._handle_bulk_update_status,
|
|
"bulk-delete": self._handle_bulk_delete,
|
|
"bulk-restore": self._handle_bulk_restore,
|
|
"bulk-permanent-delete": self._handle_bulk_permanent_delete,
|
|
"get-dependencies": self._handle_get_dependencies,
|
|
}
|
|
|
|
handler = handlers.get(operation)
|
|
if not handler:
|
|
return Response(
|
|
{"detail": f"Unknown operation: {operation}"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
return handler(Model, entity_type, entity_id, data, change_reason, request)
|
|
|
|
except Exception as e:
|
|
capture_and_log(
|
|
e, f"Database manager - {operation} error", source="api"
|
|
)
|
|
return Response(
|
|
{"detail": str(e)},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
def _handle_create(
|
|
self, Model, entity_type, entity_id, data, change_reason, request
|
|
):
|
|
"""Create a new entity."""
|
|
with transaction.atomic():
|
|
instance = Model.objects.create(**data)
|
|
return Response(
|
|
{
|
|
"success": True,
|
|
"data": {"id": str(instance.pk)},
|
|
"message": f"{entity_type} created successfully",
|
|
}
|
|
)
|
|
|
|
def _handle_update(
|
|
self, Model, entity_type, entity_id, data, change_reason, request
|
|
):
|
|
"""Update an existing entity."""
|
|
if not entity_id:
|
|
return Response(
|
|
{"detail": "entityId is required for update"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
with transaction.atomic():
|
|
try:
|
|
instance = Model.objects.get(pk=entity_id)
|
|
except Model.DoesNotExist:
|
|
return Response(
|
|
{"detail": f"{entity_type} not found"},
|
|
status=status.HTTP_404_NOT_FOUND,
|
|
)
|
|
|
|
for key, value in data.items():
|
|
if hasattr(instance, key):
|
|
setattr(instance, key, value)
|
|
instance.save()
|
|
|
|
return Response(
|
|
{
|
|
"success": True,
|
|
"data": {"id": str(instance.pk)},
|
|
"message": f"{entity_type} updated successfully",
|
|
}
|
|
)
|
|
|
|
def _handle_delete(
|
|
self, Model, entity_type, entity_id, data, change_reason, request
|
|
):
|
|
"""Soft delete an entity (set status to deleted)."""
|
|
if not entity_id:
|
|
return Response(
|
|
{"detail": "entityId is required for delete"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
with transaction.atomic():
|
|
try:
|
|
instance = Model.objects.get(pk=entity_id)
|
|
except Model.DoesNotExist:
|
|
return Response(
|
|
{"detail": f"{entity_type} not found"},
|
|
status=status.HTTP_404_NOT_FOUND,
|
|
)
|
|
|
|
# Try soft delete first (set status)
|
|
if hasattr(instance, "status"):
|
|
instance.status = "deleted"
|
|
instance.save()
|
|
elif hasattr(instance, "is_deleted"):
|
|
instance.is_deleted = True
|
|
instance.save()
|
|
elif hasattr(instance, "deleted_at"):
|
|
instance.deleted_at = timezone.now()
|
|
instance.save()
|
|
else:
|
|
# Hard delete if no soft delete field
|
|
instance.delete()
|
|
|
|
return Response(
|
|
{
|
|
"success": True,
|
|
"data": {"id": str(entity_id)},
|
|
"message": f"{entity_type} deleted successfully",
|
|
}
|
|
)
|
|
|
|
def _handle_restore(
|
|
self, Model, entity_type, entity_id, data, change_reason, request
|
|
):
|
|
"""Restore a soft-deleted entity."""
|
|
if not entity_id:
|
|
return Response(
|
|
{"detail": "entityId is required for restore"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
new_status = data.get("status", "draft")
|
|
|
|
with transaction.atomic():
|
|
try:
|
|
# Try to get even deleted entities
|
|
instance = Model.all_objects.get(pk=entity_id)
|
|
except AttributeError:
|
|
# Model doesn't have all_objects manager
|
|
instance = Model.objects.get(pk=entity_id)
|
|
except Model.DoesNotExist:
|
|
return Response(
|
|
{"detail": f"{entity_type} not found"},
|
|
status=status.HTTP_404_NOT_FOUND,
|
|
)
|
|
|
|
if hasattr(instance, "status"):
|
|
instance.status = new_status
|
|
instance.save()
|
|
elif hasattr(instance, "is_deleted"):
|
|
instance.is_deleted = False
|
|
instance.save()
|
|
elif hasattr(instance, "deleted_at"):
|
|
instance.deleted_at = None
|
|
instance.save()
|
|
|
|
return Response(
|
|
{
|
|
"success": True,
|
|
"data": {"id": str(entity_id)},
|
|
"message": f"{entity_type} restored successfully",
|
|
}
|
|
)
|
|
|
|
def _handle_permanent_delete(
|
|
self, Model, entity_type, entity_id, data, change_reason, request
|
|
):
|
|
"""Permanently delete an entity."""
|
|
if not entity_id:
|
|
return Response(
|
|
{"detail": "entityId is required for permanent-delete"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
with transaction.atomic():
|
|
try:
|
|
# Try to get even deleted entities
|
|
try:
|
|
instance = Model.all_objects.get(pk=entity_id)
|
|
except AttributeError:
|
|
instance = Model.objects.get(pk=entity_id)
|
|
except Model.DoesNotExist:
|
|
return Response(
|
|
{"detail": f"{entity_type} not found"},
|
|
status=status.HTTP_404_NOT_FOUND,
|
|
)
|
|
|
|
instance.delete()
|
|
|
|
return Response(
|
|
{
|
|
"success": True,
|
|
"data": {"id": str(entity_id)},
|
|
"message": f"{entity_type} permanently deleted",
|
|
}
|
|
)
|
|
|
|
def _handle_bulk_update_status(
|
|
self, Model, entity_type, entity_id, data, change_reason, request
|
|
):
|
|
"""Bulk update status of multiple entities."""
|
|
entity_ids = data.get("entityIds", [])
|
|
new_status = data.get("status")
|
|
|
|
if not entity_ids:
|
|
return Response(
|
|
{"detail": "entityIds is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
if not new_status:
|
|
return Response(
|
|
{"detail": "status is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
with transaction.atomic():
|
|
updated = Model.objects.filter(pk__in=entity_ids).update(status=new_status)
|
|
|
|
return Response(
|
|
{
|
|
"success": True,
|
|
"bulk": {
|
|
"successCount": updated,
|
|
"failedCount": len(entity_ids) - updated,
|
|
},
|
|
"message": f"Updated {updated} {entity_type}",
|
|
}
|
|
)
|
|
|
|
def _handle_bulk_delete(
|
|
self, Model, entity_type, entity_id, data, change_reason, request
|
|
):
|
|
"""Bulk soft delete multiple entities."""
|
|
entity_ids = data.get("entityIds", [])
|
|
|
|
if not entity_ids:
|
|
return Response(
|
|
{"detail": "entityIds is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
with transaction.atomic():
|
|
if hasattr(Model, "status"):
|
|
updated = Model.objects.filter(pk__in=entity_ids).update(
|
|
status="deleted"
|
|
)
|
|
else:
|
|
updated = Model.objects.filter(pk__in=entity_ids).update(
|
|
is_deleted=True
|
|
)
|
|
|
|
return Response(
|
|
{
|
|
"success": True,
|
|
"bulk": {
|
|
"successCount": updated,
|
|
"failedCount": len(entity_ids) - updated,
|
|
},
|
|
"message": f"Deleted {updated} {entity_type}",
|
|
}
|
|
)
|
|
|
|
def _handle_bulk_restore(
|
|
self, Model, entity_type, entity_id, data, change_reason, request
|
|
):
|
|
"""Bulk restore soft-deleted entities."""
|
|
entity_ids = data.get("entityIds", [])
|
|
new_status = data.get("status", "draft")
|
|
|
|
if not entity_ids:
|
|
return Response(
|
|
{"detail": "entityIds is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
with transaction.atomic():
|
|
try:
|
|
updated = Model.all_objects.filter(pk__in=entity_ids).update(
|
|
status=new_status
|
|
)
|
|
except AttributeError:
|
|
updated = Model.objects.filter(pk__in=entity_ids).update(
|
|
status=new_status
|
|
)
|
|
|
|
return Response(
|
|
{
|
|
"success": True,
|
|
"bulk": {
|
|
"successCount": updated,
|
|
"failedCount": len(entity_ids) - updated,
|
|
},
|
|
"message": f"Restored {updated} {entity_type}",
|
|
}
|
|
)
|
|
|
|
def _handle_bulk_permanent_delete(
|
|
self, Model, entity_type, entity_id, data, change_reason, request
|
|
):
|
|
"""Bulk permanently delete entities."""
|
|
entity_ids = data.get("entityIds", [])
|
|
|
|
if not entity_ids:
|
|
return Response(
|
|
{"detail": "entityIds is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
with transaction.atomic():
|
|
try:
|
|
deleted, _ = Model.all_objects.filter(pk__in=entity_ids).delete()
|
|
except AttributeError:
|
|
deleted, _ = Model.objects.filter(pk__in=entity_ids).delete()
|
|
|
|
return Response(
|
|
{
|
|
"success": True,
|
|
"bulk": {
|
|
"successCount": deleted,
|
|
"failedCount": len(entity_ids) - deleted,
|
|
},
|
|
"message": f"Permanently deleted {deleted} {entity_type}",
|
|
}
|
|
)
|
|
|
|
def _handle_get_dependencies(
|
|
self, Model, entity_type, entity_id, data, change_reason, request
|
|
):
|
|
"""Get dependencies for an entity before deletion."""
|
|
if not entity_id:
|
|
return Response(
|
|
{"detail": "entityId is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
try:
|
|
instance = Model.objects.get(pk=entity_id)
|
|
except Model.DoesNotExist:
|
|
return Response(
|
|
{"detail": f"{entity_type} not found"},
|
|
status=status.HTTP_404_NOT_FOUND,
|
|
)
|
|
|
|
# Get related objects count
|
|
dependencies = []
|
|
for rel in instance._meta.get_fields():
|
|
if rel.one_to_many or rel.one_to_one or rel.many_to_many:
|
|
try:
|
|
related_name = rel.get_accessor_name()
|
|
related_manager = getattr(instance, related_name, None)
|
|
if related_manager and hasattr(related_manager, "count"):
|
|
count = related_manager.count()
|
|
if count > 0:
|
|
dependencies.append(
|
|
{
|
|
"type": rel.related_model._meta.verbose_name_plural,
|
|
"count": count,
|
|
}
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return Response(
|
|
{
|
|
"success": True,
|
|
"dependencies": dependencies,
|
|
"hasDependencies": len(dependencies) > 0,
|
|
}
|
|
)
|
|
|
|
|
|
class CeleryTaskStatusView(APIView):
|
|
"""
|
|
GET /admin/tasks/status/
|
|
Return Celery task status (read-only).
|
|
"""
|
|
|
|
permission_classes = [IsAdminWithSecondFactor]
|
|
|
|
# List of known scheduled tasks
|
|
SCHEDULED_TASKS = [
|
|
{
|
|
"name": "process_scheduled_deletions",
|
|
"display_name": "Process Scheduled Deletions",
|
|
"schedule": "daily at midnight",
|
|
},
|
|
{
|
|
"name": "process_closing_entities",
|
|
"display_name": "Process Closing Entities",
|
|
"schedule": "daily at midnight",
|
|
},
|
|
{
|
|
"name": "process_expired_bans",
|
|
"display_name": "Process Expired Bans",
|
|
"schedule": "every 15 minutes",
|
|
},
|
|
{
|
|
"name": "cleanup_orphaned_images",
|
|
"display_name": "Cleanup Orphaned Images",
|
|
"schedule": "weekly on Sunday",
|
|
},
|
|
{
|
|
"name": "cleanup_old_versions",
|
|
"display_name": "Cleanup Old Versions",
|
|
"schedule": "weekly on Sunday",
|
|
},
|
|
{
|
|
"name": "data_retention_cleanup",
|
|
"display_name": "Data Retention Cleanup",
|
|
"schedule": "daily at 3 AM",
|
|
},
|
|
]
|
|
|
|
def get(self, request):
|
|
"""Return status of all scheduled tasks."""
|
|
try:
|
|
task_name = request.query_params.get("task")
|
|
|
|
tasks_status = []
|
|
for task_info in self.SCHEDULED_TASKS:
|
|
# Get last run info from cache
|
|
cache_key = f"task_last_run_{task_info['name']}"
|
|
last_run_info = cache.get(cache_key, {})
|
|
|
|
task_status = {
|
|
"name": task_info["name"],
|
|
"displayName": task_info["display_name"],
|
|
"schedule": task_info["schedule"],
|
|
"lastRun": last_run_info.get("timestamp"),
|
|
"lastResult": last_run_info.get("result", "unknown"),
|
|
"lastDuration": last_run_info.get("duration"),
|
|
"status": "scheduled",
|
|
}
|
|
|
|
if task_name and task_name == task_info["name"]:
|
|
return Response(task_status)
|
|
|
|
tasks_status.append(task_status)
|
|
|
|
if task_name:
|
|
return Response(
|
|
{"detail": f"Unknown task: {task_name}"},
|
|
status=status.HTTP_404_NOT_FOUND,
|
|
)
|
|
|
|
return Response(
|
|
{
|
|
"tasks": tasks_status,
|
|
"totalTasks": len(tasks_status),
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
capture_and_log(e, "Celery task status - error", source="api")
|
|
return Response(
|
|
{"detail": "Failed to fetch task status"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
class DetectAnomaliesView(APIView):
|
|
"""
|
|
POST /admin/anomalies/detect/
|
|
Detect data anomalies for admin dashboard.
|
|
|
|
Full parity with Supabase Edge Function: detect-anomalies
|
|
|
|
The original Edge Function implements 7 ML detection algorithms:
|
|
1. Z-Score (standard deviation outliers)
|
|
2. Moving Average (trend deviation)
|
|
3. Rate of Change (sudden changes)
|
|
4. Isolation Forest (ML-based outlier detection)
|
|
5. Seasonal Decomposition (periodic pattern anomalies)
|
|
6. Predictive (Holt-Winters exponential smoothing)
|
|
7. Ensemble (combines all algorithms)
|
|
|
|
This implementation provides:
|
|
- Config-driven detection framework
|
|
- Data quality checks (orphaned records, duplicates, incomplete data)
|
|
- Auto-alerting for critical anomalies
|
|
|
|
TODO: Implement full ML algorithms with numpy/scipy in follow-up task.
|
|
"""
|
|
|
|
permission_classes = [IsAdminWithSecondFactor]
|
|
|
|
# Severity score thresholds
|
|
SEVERITY_THRESHOLDS = {
|
|
"critical": 4.0,
|
|
"high": 3.0,
|
|
"medium": 2.0,
|
|
"low": 1.0,
|
|
}
|
|
|
|
def post(self, request):
|
|
"""Detect and return data anomalies."""
|
|
try:
|
|
# ================================================================
|
|
# Input validation with safe type handling
|
|
# ================================================================
|
|
raw_type = request.data.get("type", "all")
|
|
anomaly_type = raw_type if isinstance(raw_type, str) else "all"
|
|
anomaly_type = anomaly_type.strip().lower()[:50]
|
|
|
|
# Validate anomaly_type against allowed values
|
|
allowed_types = {"all", "orphaned", "duplicates", "incomplete", "data_quality"}
|
|
if anomaly_type not in allowed_types:
|
|
anomaly_type = "all"
|
|
|
|
# Safe sensitivity parsing with bounds
|
|
try:
|
|
sensitivity = float(request.data.get("sensitivity", 2.5))
|
|
sensitivity = max(0.1, min(sensitivity, 10.0)) # Clamp to [0.1, 10.0]
|
|
except (ValueError, TypeError):
|
|
sensitivity = 2.5
|
|
|
|
# Safe lookback_minutes parsing with bounds
|
|
try:
|
|
lookback_minutes = int(request.data.get("lookback_minutes", 60))
|
|
lookback_minutes = max(1, min(lookback_minutes, 10080)) # 1 min to 1 week
|
|
except (ValueError, TypeError):
|
|
lookback_minutes = 60
|
|
|
|
anomalies = []
|
|
|
|
|
|
# ================================================================
|
|
# Data Quality Anomalies (immediate checks)
|
|
# ================================================================
|
|
|
|
# Check for orphaned records
|
|
if anomaly_type in ["all", "orphaned", "data_quality"]:
|
|
try:
|
|
Park = apps.get_model("parks", "Park")
|
|
Ride = apps.get_model("rides", "Ride")
|
|
|
|
# Rides without parks
|
|
orphaned_rides = Ride.objects.filter(park__isnull=True).count()
|
|
if orphaned_rides > 0:
|
|
severity = "high" if orphaned_rides > 10 else "medium"
|
|
anomalies.append({
|
|
"id": f"orphaned_rides_{timezone.now().timestamp()}",
|
|
"metric_name": "orphaned_records",
|
|
"metric_category": "data_quality",
|
|
"anomaly_type": "orphaned_rides",
|
|
"severity": severity,
|
|
"baseline_value": 0,
|
|
"anomaly_value": orphaned_rides,
|
|
"deviation_score": orphaned_rides / 5.0, # Score based on count
|
|
"confidence_score": 1.0, # 100% confidence for exact counts
|
|
"detection_algorithm": "rule_based",
|
|
"description": f"{orphaned_rides} rides without associated park",
|
|
"detected_at": timezone.now().isoformat(),
|
|
})
|
|
except LookupError:
|
|
pass
|
|
|
|
# Check for duplicate slugs
|
|
if anomaly_type in ["all", "duplicates", "data_quality"]:
|
|
try:
|
|
Park = apps.get_model("parks", "Park")
|
|
duplicate_slugs = (
|
|
Park.objects.values("slug")
|
|
.annotate(count=Count("id"))
|
|
.filter(count__gt=1)
|
|
)
|
|
dup_count = duplicate_slugs.count()
|
|
if dup_count > 0:
|
|
anomalies.append({
|
|
"id": f"duplicate_slugs_{timezone.now().timestamp()}",
|
|
"metric_name": "duplicate_slugs",
|
|
"metric_category": "data_quality",
|
|
"anomaly_type": "duplicate_values",
|
|
"severity": "high" if dup_count > 5 else "medium",
|
|
"baseline_value": 0,
|
|
"anomaly_value": dup_count,
|
|
"deviation_score": dup_count / 2.0,
|
|
"confidence_score": 1.0,
|
|
"detection_algorithm": "rule_based",
|
|
"description": f"{dup_count} duplicate park slugs detected",
|
|
"detected_at": timezone.now().isoformat(),
|
|
})
|
|
except LookupError:
|
|
pass
|
|
|
|
# Check for missing required fields
|
|
if anomaly_type in ["all", "incomplete", "data_quality"]:
|
|
try:
|
|
Park = apps.get_model("parks", "Park")
|
|
parks_no_location = Park.objects.filter(
|
|
Q(latitude__isnull=True) | Q(longitude__isnull=True)
|
|
).count()
|
|
if parks_no_location > 0:
|
|
anomalies.append({
|
|
"id": f"incomplete_parks_{timezone.now().timestamp()}",
|
|
"metric_name": "incomplete_data",
|
|
"metric_category": "data_quality",
|
|
"anomaly_type": "missing_required_fields",
|
|
"severity": "low" if parks_no_location < 10 else "medium",
|
|
"baseline_value": 0,
|
|
"anomaly_value": parks_no_location,
|
|
"deviation_score": parks_no_location / 10.0,
|
|
"confidence_score": 1.0,
|
|
"detection_algorithm": "rule_based",
|
|
"description": f"{parks_no_location} parks missing location data",
|
|
"detected_at": timezone.now().isoformat(),
|
|
})
|
|
except LookupError:
|
|
pass
|
|
|
|
# ================================================================
|
|
# TODO: Implement ML-based anomaly detection
|
|
# ================================================================
|
|
# The original Supabase Edge Function reads from:
|
|
# - anomaly_detection_config table (enabled metrics, sensitivity, algorithms)
|
|
# - metric_time_series table (historical metric data)
|
|
#
|
|
# Then applies these algorithms:
|
|
# 1. z_score: (value - mean) / std_dev
|
|
# 2. moving_average: deviation from rolling average
|
|
# 3. rate_of_change: delta compared to previous values
|
|
# 4. isolation_forest: sklearn-style outlier detection
|
|
# 5. seasonal_decomposition: detect periodic pattern breaks
|
|
# 6. predictive: Holt-Winters forecasting comparison
|
|
# 7. ensemble: weighted combination of all methods
|
|
#
|
|
# For now, we return data quality anomalies. Full ML implementation
|
|
# requires numpy/scipy dependencies and metric_time_series data.
|
|
|
|
# ================================================================
|
|
# Auto-create alerts for critical/high severity
|
|
# ================================================================
|
|
for anomaly in anomalies:
|
|
if anomaly.get("severity") in ["critical", "high"]:
|
|
# Log critical anomaly (would create SystemAlert in full impl)
|
|
logger.warning(
|
|
f"Critical anomaly detected: {anomaly.get('description')}",
|
|
extra={
|
|
"anomaly_type": anomaly.get("anomaly_type"),
|
|
"severity": anomaly.get("severity"),
|
|
"deviation_score": anomaly.get("deviation_score"),
|
|
}
|
|
)
|
|
|
|
# Calculate summary counts
|
|
detected_count = len(anomalies)
|
|
critical_count = sum(1 for a in anomalies if a.get("severity") in ["critical", "high"])
|
|
|
|
return Response({
|
|
"success": True,
|
|
"detected_count": detected_count,
|
|
"critical_count": critical_count,
|
|
"anomalies": anomalies,
|
|
"scanned_at": timezone.now().isoformat(),
|
|
"config": {
|
|
"sensitivity": sensitivity,
|
|
"lookback_minutes": lookback_minutes,
|
|
"algorithms_available": [
|
|
"rule_based",
|
|
# TODO: Add when implemented
|
|
# "z_score", "moving_average", "rate_of_change",
|
|
# "isolation_forest", "seasonal", "predictive", "ensemble"
|
|
],
|
|
},
|
|
})
|
|
|
|
except Exception as e:
|
|
capture_and_log(e, "Detect anomalies - error", source="api")
|
|
return Response(
|
|
{"detail": "Failed to detect anomalies"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
class CollectMetricsView(APIView):
|
|
"""
|
|
POST /admin/metrics/collect/
|
|
Collect system metrics for admin dashboard.
|
|
|
|
BULLETPROOFED: Safe input parsing with validation.
|
|
"""
|
|
|
|
permission_classes = [IsAdminWithSecondFactor]
|
|
|
|
# Allowed values
|
|
ALLOWED_METRIC_TYPES = {"all", "database", "users", "moderation", "performance"}
|
|
ALLOWED_TIME_RANGES = {"24h", "7d", "30d", "1h", "12h"}
|
|
|
|
def post(self, request):
|
|
"""Collect and return system metrics."""
|
|
try:
|
|
# ================================================================
|
|
# Input validation with safe type handling
|
|
# ================================================================
|
|
raw_type = request.data.get("type", "all")
|
|
metric_type = raw_type if isinstance(raw_type, str) else "all"
|
|
metric_type = metric_type.strip().lower()[:50]
|
|
if metric_type not in self.ALLOWED_METRIC_TYPES:
|
|
metric_type = "all"
|
|
|
|
raw_time_range = request.data.get("timeRange", "24h")
|
|
time_range = raw_time_range if isinstance(raw_time_range, str) else "24h"
|
|
time_range = time_range.strip().lower()[:10]
|
|
if time_range not in self.ALLOWED_TIME_RANGES:
|
|
time_range = "24h"
|
|
|
|
# Parse time range to cutoff
|
|
time_range_map = {
|
|
"1h": timedelta(hours=1),
|
|
"12h": timedelta(hours=12),
|
|
"24h": timedelta(hours=24),
|
|
"7d": timedelta(days=7),
|
|
"30d": timedelta(days=30),
|
|
}
|
|
cutoff = timezone.now() - time_range_map.get(time_range, timedelta(hours=24))
|
|
|
|
metrics = {
|
|
"collectedAt": timezone.now().isoformat(),
|
|
"timeRange": time_range,
|
|
}
|
|
|
|
# Database metrics
|
|
if metric_type in ["all", "database"]:
|
|
try:
|
|
Park = apps.get_model("parks", "Park")
|
|
Ride = apps.get_model("rides", "Ride")
|
|
|
|
metrics["database"] = {
|
|
"totalParks": Park.objects.count(),
|
|
"totalRides": Ride.objects.count(),
|
|
"recentParks": Park.objects.filter(created_at__gte=cutoff).count(),
|
|
"recentRides": Ride.objects.filter(created_at__gte=cutoff).count(),
|
|
}
|
|
except (LookupError, Exception):
|
|
metrics["database"] = {
|
|
"error": "Could not fetch database metrics"
|
|
}
|
|
|
|
# User metrics
|
|
if metric_type in ["all", "users"]:
|
|
try:
|
|
metrics["users"] = {
|
|
"totalUsers": User.objects.count(),
|
|
"activeUsers": User.objects.filter(
|
|
last_login__gte=cutoff
|
|
).count(),
|
|
"newUsers": User.objects.filter(
|
|
date_joined__gte=cutoff
|
|
).count(),
|
|
}
|
|
except Exception:
|
|
metrics["users"] = {
|
|
"error": "Could not fetch user metrics"
|
|
}
|
|
|
|
# Moderation metrics
|
|
if metric_type in ["all", "moderation"]:
|
|
try:
|
|
EditSubmission = apps.get_model("moderation", "EditSubmission")
|
|
metrics["moderation"] = {
|
|
"pendingSubmissions": EditSubmission.objects.filter(
|
|
status="PENDING"
|
|
).count(),
|
|
"recentSubmissions": EditSubmission.objects.filter(
|
|
created_at__gte=cutoff
|
|
).count(),
|
|
}
|
|
except (LookupError, Exception):
|
|
metrics["moderation"] = {
|
|
"error": "Could not fetch moderation metrics"
|
|
}
|
|
|
|
return Response({
|
|
"success": True,
|
|
"metrics": metrics,
|
|
})
|
|
|
|
except Exception as e:
|
|
capture_and_log(e, "Collect metrics - error", source="api")
|
|
return Response(
|
|
{"detail": "Failed to collect metrics"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
class PipelineIntegrityScanView(APIView):
|
|
"""
|
|
POST /admin/pipeline/integrity-scan/
|
|
Scan data pipeline for integrity issues.
|
|
|
|
BULLETPROOFED: Safe input parsing with validation.
|
|
"""
|
|
|
|
permission_classes = [IsAdminWithSecondFactor]
|
|
|
|
# Allowed values
|
|
ALLOWED_SCAN_TYPES = {"full", "referential", "status", "media", "submissions", "stuck", "versions"}
|
|
MAX_HOURS_BACK = 720 # 30 days max
|
|
|
|
def post(self, request):
|
|
"""Run integrity scan on data pipeline."""
|
|
try:
|
|
# ================================================================
|
|
# Input validation with safe type handling
|
|
# ================================================================
|
|
|
|
# Safe hours_back parsing with bounds
|
|
try:
|
|
hours_back = int(request.data.get("hours_back", 48))
|
|
hours_back = max(1, min(hours_back, self.MAX_HOURS_BACK))
|
|
except (ValueError, TypeError):
|
|
hours_back = 48
|
|
|
|
# Safe scan_type validation
|
|
raw_type = request.data.get("type", "full")
|
|
scan_type = raw_type if isinstance(raw_type, str) else "full"
|
|
scan_type = scan_type.strip().lower()[:50]
|
|
if scan_type not in self.ALLOWED_SCAN_TYPES:
|
|
scan_type = "full"
|
|
|
|
# Safe fix_issues parsing (boolean)
|
|
raw_fix = request.data.get("fix", False)
|
|
fix_issues = raw_fix is True or str(raw_fix).lower() in ("true", "1", "yes")
|
|
|
|
# Calculate cutoff time based on hours_back
|
|
cutoff_time = timezone.now() - timedelta(hours=hours_back)
|
|
|
|
issues = []
|
|
fixed_count = 0
|
|
|
|
# Check for referential integrity
|
|
if scan_type in ["full", "referential"]:
|
|
try:
|
|
Ride = apps.get_model("rides", "Ride")
|
|
Park = apps.get_model("parks", "Park")
|
|
|
|
# Rides pointing to non-existent parks
|
|
valid_park_ids = Park.objects.values_list("id", flat=True)
|
|
invalid_rides = Ride.objects.exclude(
|
|
park_id__in=valid_park_ids
|
|
).exclude(park_id__isnull=True)
|
|
|
|
if invalid_rides.exists():
|
|
for ride in invalid_rides[:10]: # Limit to 10 examples
|
|
issues.append({
|
|
"issue_type": "broken_reference",
|
|
"entity_type": "ride",
|
|
"entity_id": str(ride.id),
|
|
"submission_id": "",
|
|
"severity": "critical",
|
|
"description": f"Ride '{ride.name}' has invalid park reference",
|
|
"detected_at": timezone.now().isoformat(),
|
|
})
|
|
|
|
if fix_issues:
|
|
# Set invalid park references to null
|
|
invalid_rides.update(park_id=None)
|
|
fixed_count += invalid_rides.count()
|
|
except LookupError:
|
|
pass
|
|
|
|
# Check for status consistency
|
|
if scan_type in ["full", "status"]:
|
|
try:
|
|
Park = apps.get_model("parks", "Park")
|
|
|
|
# Parks with invalid status values
|
|
valid_statuses = ["operating", "closed", "under_construction", "announced", "deleted"]
|
|
invalid_status_parks = Park.objects.exclude(status__in=valid_statuses)
|
|
|
|
for park in invalid_status_parks[:10]: # Limit to 10 examples
|
|
issues.append({
|
|
"issue_type": "invalid_status",
|
|
"entity_type": "park",
|
|
"entity_id": str(park.id),
|
|
"submission_id": "",
|
|
"severity": "warning",
|
|
"description": f"Park '{park.name}' has invalid status: {park.status}",
|
|
"detected_at": timezone.now().isoformat(),
|
|
})
|
|
except LookupError:
|
|
pass
|
|
|
|
# Check for orphaned media
|
|
if scan_type in ["full", "media"]:
|
|
try:
|
|
Photo = apps.get_model("media", "Photo")
|
|
|
|
orphaned_photos = Photo.objects.filter(
|
|
entity_id__isnull=True,
|
|
entity_type__isnull=True,
|
|
)
|
|
|
|
for photo in orphaned_photos[:10]: # Limit to 10 examples
|
|
issues.append({
|
|
"issue_type": "orphaned_media",
|
|
"entity_type": "photo",
|
|
"entity_id": str(photo.id),
|
|
"submission_id": "",
|
|
"severity": "info",
|
|
"description": "Photo without associated entity",
|
|
"detected_at": timezone.now().isoformat(),
|
|
})
|
|
except LookupError:
|
|
pass
|
|
|
|
# ================================================================
|
|
# Check 3: Stuck submissions with expired locks (from original)
|
|
# ================================================================
|
|
if scan_type in ["full", "submissions", "stuck"]:
|
|
try:
|
|
EditSubmission = apps.get_model("moderation", "EditSubmission")
|
|
|
|
# Find submissions that are claimed but claim has expired
|
|
# Assuming a claim expires after 30 minutes of inactivity
|
|
claim_expiry = timezone.now() - timedelta(minutes=30)
|
|
|
|
stuck_submissions = EditSubmission.objects.filter(
|
|
status__in=["CLAIMED", "claimed", "reviewing"],
|
|
claimed_at__lt=claim_expiry,
|
|
).exclude(claimed_at__isnull=True)
|
|
|
|
for sub in stuck_submissions[:10]: # Limit to 10 examples
|
|
hours_stuck = (timezone.now() - sub.claimed_at).total_seconds() / 3600
|
|
issues.append({
|
|
"issue_type": "stuck_submission",
|
|
"entity_type": "edit_submission",
|
|
"entity_id": str(sub.id),
|
|
"submission_id": str(sub.id),
|
|
"severity": "warning" if hours_stuck < 4 else "critical",
|
|
"description": (
|
|
f"Submission claimed by {sub.claimed_by.username if sub.claimed_by else 'unknown'} "
|
|
f"but stuck for {hours_stuck:.1f} hours"
|
|
),
|
|
"detected_at": timezone.now().isoformat(),
|
|
"metadata": {
|
|
"claimed_at": sub.claimed_at.isoformat() if sub.claimed_at else None,
|
|
"claimed_by": sub.claimed_by.username if sub.claimed_by else None,
|
|
"hours_stuck": round(hours_stuck, 1),
|
|
},
|
|
})
|
|
|
|
if fix_issues:
|
|
# Unclaim stuck submissions
|
|
sub.claimed_by = None
|
|
sub.claimed_at = None
|
|
sub.status = "PENDING"
|
|
sub.save(update_fields=["claimed_by", "claimed_at", "status"])
|
|
fixed_count += 1
|
|
|
|
except LookupError:
|
|
pass
|
|
|
|
# ================================================================
|
|
# Check: Entities with approvals but no version records (from original)
|
|
# Uses pghistory events as proxy for version records
|
|
# ================================================================
|
|
if scan_type in ["full", "versions"]:
|
|
try:
|
|
# Check if pghistory events exist for recently approved submissions
|
|
EditSubmission = apps.get_model("moderation", "EditSubmission")
|
|
|
|
recently_approved = EditSubmission.objects.filter(
|
|
status__in=["APPROVED", "approved"],
|
|
handled_at__gte=cutoff_time,
|
|
)
|
|
|
|
for sub in recently_approved[:10]:
|
|
# Check if the target object has history
|
|
target = sub.content_object
|
|
if target and hasattr(target, 'events'):
|
|
try:
|
|
event_count = target.events.count()
|
|
if event_count == 0:
|
|
issues.append({
|
|
"issue_type": "missing_version_record",
|
|
"entity_type": sub.content_type.model,
|
|
"entity_id": str(sub.object_id),
|
|
"submission_id": str(sub.id),
|
|
"severity": "critical",
|
|
"description": f"Approved {sub.content_type.model} has no version history",
|
|
"detected_at": timezone.now().isoformat(),
|
|
})
|
|
except Exception:
|
|
pass
|
|
except LookupError:
|
|
pass
|
|
|
|
# Calculate summary counts
|
|
critical_count = sum(1 for i in issues if i.get("severity") == "critical")
|
|
warning_count = sum(1 for i in issues if i.get("severity") == "warning")
|
|
info_count = sum(1 for i in issues if i.get("severity") == "info")
|
|
|
|
# Return in frontend-expected format
|
|
return Response({
|
|
"success": True,
|
|
"scan_timestamp": timezone.now().isoformat(),
|
|
"hours_scanned": hours_back,
|
|
"issues_found": len(issues),
|
|
"issues": issues,
|
|
"summary": {
|
|
"critical": critical_count,
|
|
"warning": warning_count,
|
|
"info": info_count,
|
|
},
|
|
})
|
|
|
|
except Exception as e:
|
|
capture_and_log(e, "Pipeline integrity scan - error", source="api")
|
|
return Response(
|
|
{"detail": "Failed to run integrity scan"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
class AdminSettingsView(APIView):
|
|
"""
|
|
GET/POST /admin/settings/
|
|
Simple key-value store for admin preferences.
|
|
|
|
Settings are stored in Django cache with admin-specific keys.
|
|
For persistent storage, a database model can be added later.
|
|
"""
|
|
|
|
permission_classes = [IsAdminWithSecondFactor]
|
|
|
|
def get(self, request):
|
|
"""Get all admin settings or a specific setting."""
|
|
try:
|
|
key = request.query_params.get("key")
|
|
|
|
if key:
|
|
# Get specific setting
|
|
value = cache.get(f"admin_setting_{key}")
|
|
if value is None:
|
|
return Response(
|
|
{"results": []},
|
|
status=status.HTTP_200_OK,
|
|
)
|
|
return Response(
|
|
{"results": [{"key": key, "value": value}]},
|
|
status=status.HTTP_200_OK,
|
|
)
|
|
|
|
# Get all settings (return empty list if none exist)
|
|
# In a real implementation, you'd query a database model
|
|
settings_keys = cache.get("admin_settings_keys", [])
|
|
results = []
|
|
for k in settings_keys:
|
|
val = cache.get(f"admin_setting_{k}")
|
|
if val is not None:
|
|
results.append({"key": k, "value": val})
|
|
|
|
return Response(
|
|
{"results": results, "count": len(results)},
|
|
status=status.HTTP_200_OK,
|
|
)
|
|
|
|
except Exception as e:
|
|
capture_and_log(e, "Admin settings GET - error", source="api")
|
|
return Response(
|
|
{"detail": "Failed to fetch admin settings"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
def post(self, request):
|
|
"""Create or update an admin setting."""
|
|
try:
|
|
key = request.data.get("key")
|
|
value = request.data.get("value")
|
|
|
|
if not key:
|
|
return Response(
|
|
{"detail": "key is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Store in cache (30 days TTL)
|
|
cache.set(f"admin_setting_{key}", value, 60 * 60 * 24 * 30)
|
|
|
|
# Track keys
|
|
settings_keys = cache.get("admin_settings_keys", [])
|
|
if key not in settings_keys:
|
|
settings_keys.append(key)
|
|
cache.set("admin_settings_keys", settings_keys, 60 * 60 * 24 * 30)
|
|
|
|
return Response(
|
|
{"success": True, "key": key, "value": value},
|
|
status=status.HTTP_200_OK,
|
|
)
|
|
|
|
except Exception as e:
|
|
capture_and_log(e, "Admin settings POST - error", source="api")
|
|
return Response(
|
|
{"detail": "Failed to save admin setting"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|