feat: Implement centralized error capture and handling with new middleware, services, and API endpoints, and add new admin and statistics API views.

This commit is contained in:
pacnpal
2026-01-02 15:55:42 -05:00
parent 1adba1b804
commit 95700c7d7b
43 changed files with 2477 additions and 158 deletions

View File

@@ -0,0 +1,126 @@
"""
Serializers for the error monitoring API.
"""
from rest_framework import serializers
from apps.core.models import ApplicationError
class ApplicationErrorSerializer(serializers.ModelSerializer):
"""Full error details for admin dashboard."""
user_email = serializers.SerializerMethodField()
user_username = serializers.SerializerMethodField()
resolved_by_username = serializers.SerializerMethodField()
class Meta:
model = ApplicationError
fields = [
"id",
"error_id",
"request_id",
"error_type",
"error_message",
"error_stack",
"error_code",
"severity",
"source",
"endpoint",
"http_method",
"http_status",
"user_agent",
"user",
"user_email",
"user_username",
"ip_address_hash",
"metadata",
"environment",
"created_at",
"resolved",
"resolved_at",
"resolved_by",
"resolved_by_username",
"resolution_notes",
]
read_only_fields = fields
def get_user_email(self, obj: ApplicationError) -> str | None:
"""Get the email of the user who encountered the error."""
if obj.user:
return obj.user.email
return None
def get_user_username(self, obj: ApplicationError) -> str | None:
"""Get the username of the user who encountered the error."""
if obj.user:
return obj.user.username
return None
def get_resolved_by_username(self, obj: ApplicationError) -> str | None:
"""Get the username of the admin who resolved the error."""
if obj.resolved_by:
return obj.resolved_by.username
return None
class ApplicationErrorListSerializer(serializers.ModelSerializer):
"""Lightweight serializer for error list views."""
class Meta:
model = ApplicationError
fields = [
"id",
"error_id",
"error_type",
"error_message",
"severity",
"source",
"endpoint",
"created_at",
"resolved",
]
read_only_fields = fields
class ErrorReportSerializer(serializers.Serializer):
"""Frontend error report submission."""
error_id = serializers.UUIDField(required=False, allow_null=True)
error_type = serializers.CharField(max_length=100)
error_message = serializers.CharField(max_length=5000)
error_stack = serializers.CharField(required=False, allow_blank=True, max_length=10000)
error_code = serializers.CharField(required=False, allow_blank=True, max_length=50)
severity = serializers.ChoiceField(
choices=["critical", "high", "medium", "low"],
default="medium",
)
endpoint = serializers.CharField(required=False, allow_blank=True, max_length=500)
metadata = serializers.JSONField(required=False, default=dict)
environment = serializers.JSONField(required=False, default=dict)
class ErrorStatisticsSerializer(serializers.Serializer):
"""Error statistics response for dashboard."""
total_errors = serializers.IntegerField()
errors_by_severity = serializers.DictField()
errors_by_source = serializers.DictField()
errors_by_type = serializers.ListField()
errors_over_time = serializers.ListField()
resolution_rate = serializers.FloatField()
critical_count = serializers.IntegerField()
unresolved_count = serializers.IntegerField()
period_days = serializers.IntegerField()
class ErrorResolveSerializer(serializers.Serializer):
"""Request to resolve an error."""
notes = serializers.CharField(required=False, allow_blank=True, max_length=1000)
class ErrorCleanupSerializer(serializers.Serializer):
"""Request to cleanup old errors."""
days = serializers.IntegerField(min_value=1, max_value=365, default=30)

View File

@@ -0,0 +1,286 @@
"""
API views for error monitoring and reporting.
Provides endpoints for:
- Frontend error reporting (public, rate-limited)
- Error listing and filtering (admin only)
- Error detail view (admin only)
- Error resolution (admin only)
- Error statistics (admin only)
- Old error cleanup (superuser only)
"""
import logging
from django.db.models import Q
from rest_framework import status
from rest_framework.permissions import AllowAny, IsAdminUser
from rest_framework.response import Response
from rest_framework.throttling import AnonRateThrottle
from rest_framework.views import APIView
from apps.core.api.error_serializers import (
ApplicationErrorListSerializer,
ApplicationErrorSerializer,
ErrorCleanupSerializer,
ErrorReportSerializer,
ErrorResolveSerializer,
ErrorStatisticsSerializer,
)
from apps.core.models import ApplicationError
from apps.core.services import ErrorService
logger = logging.getLogger(__name__)
class ErrorReportThrottle(AnonRateThrottle):
"""Rate limit for error reporting - 10 requests per minute per IP."""
rate = "10/min"
class ErrorReportView(APIView):
"""
POST /api/v1/errors/report/
Accept error reports from the frontend.
Public endpoint, rate-limited to prevent abuse.
"""
permission_classes = [AllowAny]
throttle_classes = [ErrorReportThrottle]
def post(self, request):
"""Record an error report from the frontend."""
serializer = ErrorReportSerializer(data=request.data)
serializer.validate(request.data)
if not serializer.is_valid():
return Response(
{"error": "Invalid error report", "details": serializer.errors},
status=status.HTTP_400_BAD_REQUEST,
)
data = serializer.validated_data
# Capture the error
error = ErrorService.capture_error(
error=data["error_message"],
source="frontend",
request=request,
severity=data.get("severity", "medium"),
metadata=data.get("metadata", {}),
environment=data.get("environment", {}),
error_id=data.get("error_id"),
)
# Update error_type from the submitted data
error.error_type = data["error_type"]
error.error_stack = data.get("error_stack", "")
error.error_code = data.get("error_code", "")
error.endpoint = data.get("endpoint", "")
error.save(update_fields=["error_type", "error_stack", "error_code", "endpoint"])
return Response(
{
"status": "success",
"error_id": str(error.error_id),
"message": "Error report received",
},
status=status.HTTP_201_CREATED,
)
class ErrorListView(APIView):
"""
GET /api/v1/errors/
List errors with filtering. Admin only.
"""
permission_classes = [IsAdminUser]
def get(self, request):
"""List errors with optional filters."""
queryset = ApplicationError.objects.all()
# Apply filters
severity = request.query_params.get("severity")
if severity:
queryset = queryset.filter(severity=severity)
source = request.query_params.get("source")
if source:
queryset = queryset.filter(source=source)
error_type = request.query_params.get("error_type")
if error_type:
queryset = queryset.filter(error_type__icontains=error_type)
resolved = request.query_params.get("resolved")
if resolved is not None:
resolved_bool = resolved.lower() in ("true", "1", "yes")
queryset = queryset.filter(resolved=resolved_bool)
search = request.query_params.get("search")
if search:
queryset = queryset.filter(
Q(error_id__icontains=search)
| Q(error_message__icontains=search)
| Q(endpoint__icontains=search)
| Q(error_type__icontains=search)
)
# Date range filter
date_range = request.query_params.get("date_range", "24h")
from datetime import timedelta
from django.utils import timezone
range_map = {
"1h": timedelta(hours=1),
"24h": timedelta(hours=24),
"7d": timedelta(days=7),
"30d": timedelta(days=30),
}
if date_range in range_map:
cutoff = timezone.now() - range_map[date_range]
queryset = queryset.filter(created_at__gte=cutoff)
# Pagination
limit = min(int(request.query_params.get("limit", 50)), 100)
offset = int(request.query_params.get("offset", 0))
total = queryset.count()
errors = queryset[offset : offset + limit]
serializer = ApplicationErrorListSerializer(errors, many=True)
return Response(
{
"status": "success",
"data": serializer.data,
"total": total,
"limit": limit,
"offset": offset,
}
)
class ErrorDetailView(APIView):
"""
GET /api/v1/errors/{id}/
Get full error details. Admin only.
"""
permission_classes = [IsAdminUser]
def get(self, request, pk):
"""Get detailed error information."""
try:
error = ApplicationError.objects.get(pk=pk)
except ApplicationError.DoesNotExist:
return Response(
{"error": "Error not found"},
status=status.HTTP_404_NOT_FOUND,
)
serializer = ApplicationErrorSerializer(error)
return Response({"status": "success", "data": serializer.data})
class ErrorResolveView(APIView):
"""
POST /api/v1/errors/{id}/resolve/
Mark an error as resolved. Admin only.
"""
permission_classes = [IsAdminUser]
def post(self, request, pk):
"""Mark error as resolved."""
serializer = ErrorResolveSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"error": "Invalid request", "details": serializer.errors},
status=status.HTTP_400_BAD_REQUEST,
)
try:
error = ErrorService.resolve_error(
error_id=pk,
user=request.user,
notes=serializer.validated_data.get("notes", ""),
)
except ApplicationError.DoesNotExist:
return Response(
{"error": "Error not found"},
status=status.HTTP_404_NOT_FOUND,
)
return Response(
{
"status": "success",
"message": "Error marked as resolved",
"error_id": str(error.error_id),
}
)
class ErrorStatisticsView(APIView):
"""
GET /api/v1/errors/statistics/
Get error statistics for dashboard. Admin only.
"""
permission_classes = [IsAdminUser]
def get(self, request):
"""Get error statistics."""
days = int(request.query_params.get("days", 7))
days = min(max(days, 1), 90) # Clamp between 1 and 90
stats = ErrorService.get_error_statistics(days=days)
serializer = ErrorStatisticsSerializer(stats)
return Response({"status": "success", "data": serializer.data})
class ErrorCleanupView(APIView):
"""
POST /api/v1/errors/cleanup/
Clean up old errors. Superuser only.
"""
permission_classes = [IsAdminUser]
def post(self, request):
"""Clean up old resolved errors."""
# Extra check for superuser
if not request.user.is_superuser:
return Response(
{"error": "Superuser access required"},
status=status.HTTP_403_FORBIDDEN,
)
serializer = ErrorCleanupSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"error": "Invalid request", "details": serializer.errors},
status=status.HTTP_400_BAD_REQUEST,
)
days = serializer.validated_data.get("days", 30)
deleted_count = ErrorService.cleanup_old_errors(days=days)
return Response(
{
"status": "success",
"message": f"Deleted {deleted_count} errors older than {days} days",
"deleted_count": deleted_count,
}
)

View File

@@ -0,0 +1,170 @@
"""
ErrorCaptureMiddleware - Automatically capture backend exceptions.
This middleware intercepts unhandled exceptions in Django and logs them
to the ApplicationError model for display in the admin error dashboard.
"""
import logging
from django.http import HttpRequest, HttpResponse
from apps.core.services import ErrorService
logger = logging.getLogger(__name__)
class ErrorCaptureMiddleware:
"""
Middleware that captures unhandled exceptions and stores them.
This runs after Django's built-in exception handling but before
the exception is raised to the user, allowing us to log all errors
that occur during request processing.
Usage:
Add to settings.MIDDLEWARE after SecurityMiddleware but before
any middleware that might swallow exceptions:
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'apps.core.middleware.error_capture.ErrorCaptureMiddleware',
...
]
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request: HttpRequest) -> HttpResponse:
response = self.get_response(request)
# Capture 5xx errors that made it through (server errors)
if response.status_code >= 500:
self._capture_response_error(request, response)
return response
def process_exception(self, request: HttpRequest, exception: Exception) -> None:
"""
Capture exception details before Django handles it.
This method is called when a view raises an exception.
It logs the error to the database but returns None to allow
Django to continue with its default exception handling.
Args:
request: The HTTP request that caused the exception
exception: The exception that was raised
Returns:
None - let Django continue with default handling
"""
try:
self._capture_exception(request, exception)
except Exception as e:
# Don't let error capture failures break the request
logger.error(f"Failed to capture exception: {e}")
return None # Let Django continue with default handling
def _capture_exception(
self, request: HttpRequest, exception: Exception
) -> None:
"""Capture an exception to the database."""
# Determine severity based on exception type
severity = self._classify_severity(exception)
# Determine source based on request path
source = "api" if "/api/" in request.path else "backend"
ErrorService.capture_error(
error=exception,
source=source,
request=request,
severity=severity,
metadata={
"view": self._get_view_name(request),
"query_params": dict(request.GET),
},
)
def _capture_response_error(
self, request: HttpRequest, response: HttpResponse
) -> None:
"""Capture a 5xx response error to the database."""
# Only capture if we have a reason phrase or content
reason = getattr(response, "reason_phrase", "Server Error")
content = ""
if hasattr(response, "content"):
try:
content = response.content.decode("utf-8")[:500]
except (UnicodeDecodeError, AttributeError):
pass
error_message = f"HTTP {response.status_code}: {reason}"
if content:
error_message += f" - {content[:200]}"
severity = "critical" if response.status_code >= 503 else "high"
source = "api" if "/api/" in request.path else "backend"
ErrorService.capture_error(
error=error_message,
source=source,
request=request,
severity=severity,
metadata={
"status_code": response.status_code,
"reason_phrase": reason,
"view": self._get_view_name(request),
},
)
def _classify_severity(self, exception: Exception) -> str:
"""Classify exception severity based on type."""
exception_type = type(exception).__name__
# Critical: Database, system, and security errors
critical_types = {
"DatabaseError",
"OperationalError",
"IntegrityError",
"PermissionError",
"SystemError",
"MemoryError",
}
# High: Unexpected runtime errors
high_types = {
"RuntimeError",
"TypeError",
"ValueError",
"AttributeError",
"KeyError",
"IndexError",
}
# Medium: Expected application errors
medium_types = {
"ValidationError",
"Http404",
"NotFound",
"PermissionDenied",
"AuthenticationFailed",
}
if exception_type in critical_types:
return "critical"
elif exception_type in high_types:
return "high"
elif exception_type in medium_types:
return "medium"
else:
return "high" # Default to high for unknown errors
def _get_view_name(self, request: HttpRequest) -> str:
"""Get the name of the view that handled the request."""
if hasattr(request, "resolver_match") and request.resolver_match:
return request.resolver_match.view_name or ""
return ""

View File

@@ -17,6 +17,7 @@ from django.http import HttpRequest, HttpResponse
from django.utils import timezone
from apps.core.analytics import PageView
from apps.core.utils import capture_and_log
from apps.parks.models import Park
from apps.rides.models import Ride
@@ -65,8 +66,8 @@ class ViewTrackingMiddleware:
try:
self._track_view_if_applicable(request)
except Exception as e:
# Log error but don't break the request
self.logger.error(f"Error tracking view: {e}", exc_info=True)
# Capture error but don't break the request
capture_and_log(e, f'Track view for {request.path}', source='middleware', severity='low')
return response
@@ -137,7 +138,7 @@ class ViewTrackingMiddleware:
self.logger.debug(f"Recorded view for {content_type} {slug} from {client_ip}")
except Exception as e:
self.logger.error(f"Failed to record page view for {content_type} {slug}: {e}")
capture_and_log(e, f'Record page view for {content_type} {slug}', source='middleware', severity='low')
def _get_content_object(self, content_type: str, slug: str) -> ContentObject | None:
"""Get the content object by type and slug."""
@@ -156,7 +157,7 @@ class ViewTrackingMiddleware:
except Park.DoesNotExist:
return None
except Exception as e:
self.logger.error(f"Error getting {content_type} with slug {slug}: {e}")
capture_and_log(e, f'Get {content_type} with slug {slug}', source='middleware', severity='low')
return None
def _is_duplicate_view(self, content_obj: ContentObject, client_ip: str) -> bool:
@@ -298,5 +299,5 @@ def get_view_stats_for_content(content_obj: ContentObject, hours: int = 24) -> d
}
except Exception as e:
logger.error(f"Error getting view stats: {e}")
capture_and_log(e, f'Get view stats for content', source='service', severity='low')
return {"total_views": 0, "unique_views": 0, "hours": hours, "error": str(e)}

View File

@@ -0,0 +1,152 @@
# Generated by Django 5.2.9 on 2026-01-02 16:18
import django.db.models.deletion
import uuid
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("core", "0004_alter_slughistory_options_and_more"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="ApplicationError",
fields=[
("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
(
"error_id",
models.UUIDField(
db_index=True,
default=uuid.uuid4,
help_text="Unique identifier for this error instance",
unique=True,
),
),
(
"request_id",
models.CharField(
blank=True, db_index=True, help_text="Request correlation ID if available", max_length=255
),
),
(
"error_type",
models.CharField(
db_index=True,
help_text="Type/class of the error (e.g., 'ValidationError', 'TypeError')",
max_length=100,
),
),
("error_message", models.TextField(help_text="Human-readable error message")),
("error_stack", models.TextField(blank=True, help_text="Stack trace if available")),
(
"error_code",
models.CharField(
blank=True, db_index=True, help_text="Application-specific error code", max_length=50
),
),
(
"severity",
models.CharField(
choices=[("critical", "Critical"), ("high", "High"), ("medium", "Medium"), ("low", "Low")],
db_index=True,
default="medium",
help_text="Error severity level",
max_length=20,
),
),
(
"source",
models.CharField(
choices=[("frontend", "Frontend"), ("backend", "Backend"), ("api", "API")],
db_index=True,
help_text="Where the error originated",
max_length=20,
),
),
(
"endpoint",
models.CharField(blank=True, help_text="URL/endpoint where the error occurred", max_length=500),
),
("http_method", models.CharField(blank=True, help_text="HTTP method of the request", max_length=10)),
(
"http_status",
models.PositiveIntegerField(blank=True, help_text="HTTP status code returned", null=True),
),
("user_agent", models.TextField(blank=True, help_text="User agent string from the client")),
(
"ip_address_hash",
models.CharField(
blank=True,
db_index=True,
help_text="Hashed IP address for rate limiting (privacy-preserving)",
max_length=64,
),
),
(
"metadata",
models.JSONField(
blank=True, default=dict, help_text="Additional context (action, entity info, etc.)"
),
),
(
"environment",
models.JSONField(
blank=True, default=dict, help_text="Client environment info (viewport, browser, etc.)"
),
),
(
"created_at",
models.DateTimeField(auto_now_add=True, db_index=True, help_text="When the error was recorded"),
),
(
"resolved",
models.BooleanField(
db_index=True, default=False, help_text="Whether this error has been addressed"
),
),
(
"resolved_at",
models.DateTimeField(blank=True, help_text="When the error was marked resolved", null=True),
),
("resolution_notes", models.TextField(blank=True, help_text="Notes about how the error was resolved")),
(
"resolved_by",
models.ForeignKey(
blank=True,
help_text="Admin who resolved this error",
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="resolved_errors",
to=settings.AUTH_USER_MODEL,
),
),
(
"user",
models.ForeignKey(
blank=True,
help_text="User who encountered the error",
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="application_errors",
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"verbose_name": "Application Error",
"verbose_name_plural": "Application Errors",
"ordering": ["-created_at"],
"indexes": [
models.Index(fields=["severity", "created_at"], name="core_applic_severit_6eeb93_idx"),
models.Index(fields=["source", "created_at"], name="core_applic_source_31a37f_idx"),
models.Index(fields=["error_type", "created_at"], name="core_applic_error_t_e787f5_idx"),
models.Index(fields=["resolved", "created_at"], name="core_applic_resolve_2b0297_idx"),
],
},
),
]

View File

@@ -1,4 +1,8 @@
import hashlib
import uuid
import pghistory
from django.conf import settings
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.db import models
@@ -122,3 +126,175 @@ class SluggedModel(TrackedModel):
)
raise cls.DoesNotExist(f"{cls.__name__} with slug '{slug}' does not exist") from None
class ApplicationError(models.Model):
"""
Stores application errors from frontend and backend sources.
Errors are captured automatically via middleware (backend) or
reported via API (frontend) and displayed in the admin dashboard.
"""
class Severity(models.TextChoices):
CRITICAL = "critical", "Critical"
HIGH = "high", "High"
MEDIUM = "medium", "Medium"
LOW = "low", "Low"
class Source(models.TextChoices):
FRONTEND = "frontend", "Frontend"
BACKEND = "backend", "Backend"
API = "api", "API"
# Identity
error_id = models.UUIDField(
unique=True,
default=uuid.uuid4,
db_index=True,
help_text="Unique identifier for this error instance",
)
request_id = models.CharField(
max_length=255,
blank=True,
db_index=True,
help_text="Request correlation ID if available",
)
# Error information
error_type = models.CharField(
max_length=100,
db_index=True,
help_text="Type/class of the error (e.g., 'ValidationError', 'TypeError')",
)
error_message = models.TextField(
help_text="Human-readable error message",
)
error_stack = models.TextField(
blank=True,
help_text="Stack trace if available",
)
error_code = models.CharField(
max_length=50,
blank=True,
db_index=True,
help_text="Application-specific error code",
)
severity = models.CharField(
max_length=20,
choices=Severity.choices,
default=Severity.MEDIUM,
db_index=True,
help_text="Error severity level",
)
source = models.CharField(
max_length=20,
choices=Source.choices,
db_index=True,
help_text="Where the error originated",
)
# Request context
endpoint = models.CharField(
max_length=500,
blank=True,
help_text="URL/endpoint where the error occurred",
)
http_method = models.CharField(
max_length=10,
blank=True,
help_text="HTTP method of the request",
)
http_status = models.PositiveIntegerField(
null=True,
blank=True,
help_text="HTTP status code returned",
)
user_agent = models.TextField(
blank=True,
help_text="User agent string from the client",
)
# User context
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="application_errors",
help_text="User who encountered the error",
)
ip_address_hash = models.CharField(
max_length=64,
blank=True,
db_index=True,
help_text="Hashed IP address for rate limiting (privacy-preserving)",
)
# Extended metadata
metadata = models.JSONField(
default=dict,
blank=True,
help_text="Additional context (action, entity info, etc.)",
)
environment = models.JSONField(
default=dict,
blank=True,
help_text="Client environment info (viewport, browser, etc.)",
)
# Timestamps and resolution
created_at = models.DateTimeField(
auto_now_add=True,
db_index=True,
help_text="When the error was recorded",
)
resolved = models.BooleanField(
default=False,
db_index=True,
help_text="Whether this error has been addressed",
)
resolved_at = models.DateTimeField(
null=True,
blank=True,
help_text="When the error was marked resolved",
)
resolved_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="resolved_errors",
help_text="Admin who resolved this error",
)
resolution_notes = models.TextField(
blank=True,
help_text="Notes about how the error was resolved",
)
class Meta:
ordering = ["-created_at"]
verbose_name = "Application Error"
verbose_name_plural = "Application Errors"
indexes = [
models.Index(fields=["severity", "created_at"]),
models.Index(fields=["source", "created_at"]),
models.Index(fields=["error_type", "created_at"]),
models.Index(fields=["resolved", "created_at"]),
]
def __str__(self) -> str:
return f"[{self.severity.upper()}] {self.error_type}: {self.error_message[:50]}"
@staticmethod
def hash_ip(ip_address: str) -> str:
"""Hash an IP address for privacy-preserving storage."""
if not ip_address:
return ""
salt = getattr(settings, "SECRET_KEY", "")[:16]
return hashlib.sha256(f"{salt}{ip_address}".encode()).hexdigest()
@property
def short_error_id(self) -> str:
"""Return first 8 characters of error_id for display."""
return str(self.error_id)[:8]

View File

@@ -3,6 +3,7 @@ Core services for ThrillWiki unified map functionality.
"""
from .clustering_service import ClusteringService
from .error_service import ErrorService
from .data_structures import (
ClusterData,
GeoBounds,
@@ -17,6 +18,7 @@ from .map_service import UnifiedMapService
__all__ = [
"UnifiedMapService",
"ClusteringService",
"ErrorService",
"MapCacheService",
"UnifiedLocation",
"LocationType",

View File

@@ -12,6 +12,8 @@ from typing import Any
from django.core.cache import caches
from apps.core.utils import capture_and_log
logger = logging.getLogger(__name__)
@@ -122,7 +124,7 @@ class EnhancedCacheService:
else:
logger.warning(f"Cache backend does not support pattern deletion for pattern '{pattern}'")
except Exception as e:
logger.error(f"Error invalidating cache pattern '{pattern}': {e}")
capture_and_log(e, f"Invalidate cache pattern '{pattern}'", source='service', severity='low')
def invalidate_model_cache(self, model_name: str, instance_id: int | None = None):
"""Invalidate cache keys related to a specific model"""
@@ -144,7 +146,7 @@ class EnhancedCacheService:
self.default_cache.set(cache_key, data, timeout)
logger.info(f"Warmed cache for key '{cache_key}'")
except Exception as e:
logger.error(f"Error warming cache for key '{cache_key}': {e}")
capture_and_log(e, f"Warm cache for key '{cache_key}'", source='service', severity='low')
def _generate_api_cache_key(self, view_name: str, params: dict) -> str:
"""Generate consistent cache keys for API responses"""
@@ -250,7 +252,7 @@ class CacheWarmer:
try:
self.cache_service.warm_cache(**operation)
except Exception as e:
logger.error(f"Error warming cache for {operation['cache_key']}: {e}")
capture_and_log(e, f"Warm cache for {operation['cache_key']}", source='service', severity='low')
# Cache statistics and monitoring

View File

@@ -0,0 +1,319 @@
"""
ErrorService - Centralized error capture and management.
Provides methods for:
- Capturing errors from frontend and backend
- Querying errors with filtering
- Generating statistics for dashboard
- Resolving errors
- Cleaning up old errors
"""
import logging
import traceback
from datetime import timedelta
from typing import Any
from uuid import UUID
from django.db.models import Count, Q
from django.db.models.functions import TruncDate
from django.http import HttpRequest
from django.utils import timezone
from apps.core.models import ApplicationError
logger = logging.getLogger(__name__)
class ErrorService:
"""Service for error capture and management."""
@staticmethod
def capture_error(
error: Exception | str,
source: str,
request: HttpRequest | None = None,
user: Any | None = None,
severity: str = "medium",
metadata: dict | None = None,
environment: dict | None = None,
error_id: UUID | None = None,
) -> ApplicationError:
"""
Capture and store an error.
Args:
error: The exception or error message
source: One of 'frontend', 'backend', 'api'
request: Optional HTTP request for context
user: Optional user who encountered the error
severity: Error severity level
metadata: Additional context data
environment: Client environment info
Returns:
The created ApplicationError instance
"""
# Extract error details
if isinstance(error, Exception):
error_type = type(error).__name__
error_message = str(error)
error_stack = traceback.format_exc()
error_code = getattr(error, "error_code", "") or ""
else:
error_type = "Error"
error_message = str(error)
error_stack = ""
error_code = ""
# Extract request details
endpoint = ""
http_method = ""
user_agent = ""
ip_address_hash = ""
http_status = None
# Build request_context for additional debugging info
request_context: dict[str, Any] = {}
if request:
endpoint = request.path
http_method = request.method
user_agent = request.META.get("HTTP_USER_AGENT", "")
# Hash IP for privacy
ip = ErrorService._get_client_ip(request)
ip_address_hash = ApplicationError.hash_ip(ip)
# Use request user if not provided
if user is None and hasattr(request, "user") and request.user.is_authenticated:
user = request.user
# Capture additional request context for debugging
request_context = {
"content_type": request.content_type,
"query_string": request.META.get("QUERY_STRING", "")[:500],
"request_id": request.META.get("HTTP_X_REQUEST_ID", ""),
"accept_language": request.META.get("HTTP_ACCEPT_LANGUAGE", ""),
"referer": request.META.get("HTTP_REFERER", ""),
"origin": request.META.get("HTTP_ORIGIN", ""),
}
# Capture request body snippet for POST/PUT/PATCH (sanitized)
if http_method in ("POST", "PUT", "PATCH"):
try:
body_snippet = request.body.decode("utf-8")[:1000] if request.body else ""
# Sanitize sensitive fields
for field in ("password", "token", "secret", "key", "auth"):
if field in body_snippet.lower():
body_snippet = "[REDACTED - contains sensitive data]"
break
request_context["body_snippet"] = body_snippet
except Exception:
request_context["body_snippet"] = "[Could not decode body]"
# Extract exception chain for comprehensive debugging
if isinstance(error, Exception):
cause_chain = []
current_cause = error.__cause__
depth = 0
while current_cause and depth < 5:
cause_chain.append({
"type": type(current_cause).__name__,
"message": str(current_cause)[:500],
})
current_cause = current_cause.__cause__
depth += 1
if cause_chain:
request_context["exception_chain"] = cause_chain
# Merge request_context into metadata
merged_metadata = {**(metadata or {}), "request_context": request_context}
# Create and save error
app_error = ApplicationError.objects.create(
error_id=error_id or None, # Let model generate if not provided
error_type=error_type,
error_message=error_message[:5000], # Limit message length
error_stack=error_stack[:10000], # Limit stack length
error_code=error_code,
severity=severity,
source=source,
endpoint=endpoint,
http_method=http_method,
user_agent=user_agent[:1000],
user=user,
ip_address_hash=ip_address_hash,
metadata=merged_metadata,
environment=environment or {},
)
logger.info(
f"Captured error {app_error.short_error_id}: {error_type} from {source}"
)
return app_error
@staticmethod
def capture_frontend_error(
error_data: dict,
request: HttpRequest | None = None,
) -> ApplicationError:
"""
Capture an error reported from the frontend.
Args:
error_data: Dictionary containing error details from frontend
request: HTTP request for IP/user context
Returns:
The created ApplicationError instance
"""
return ErrorService.capture_error(
error=error_data.get("error_message", "Unknown error"),
source="frontend",
request=request,
severity=error_data.get("severity", "medium"),
metadata=error_data.get("metadata", {}),
environment=error_data.get("environment", {}),
error_id=error_data.get("error_id"),
)
@staticmethod
def get_error_statistics(days: int = 7) -> dict:
"""
Get error statistics for the dashboard.
Args:
days: Number of days to include in statistics
Returns:
Dictionary containing error statistics
"""
cutoff = timezone.now() - timedelta(days=days)
base_queryset = ApplicationError.objects.filter(created_at__gte=cutoff)
# Total errors
total_errors = base_queryset.count()
# Errors by severity
severity_counts = dict(
base_queryset.values("severity")
.annotate(count=Count("id"))
.values_list("severity", "count")
)
# Errors by source
source_counts = dict(
base_queryset.values("source")
.annotate(count=Count("id"))
.values_list("source", "count")
)
# Top error types
error_types = list(
base_queryset.values("error_type")
.annotate(count=Count("id"))
.order_by("-count")[:10]
)
# Errors over time (daily)
errors_over_time = list(
base_queryset.annotate(date=TruncDate("created_at"))
.values("date")
.annotate(count=Count("id"))
.order_by("date")
)
# Convert dates to strings for JSON serialization
for item in errors_over_time:
item["date"] = item["date"].isoformat() if item["date"] else None
# Resolution rate
resolved_count = base_queryset.filter(resolved=True).count()
resolution_rate = (resolved_count / total_errors * 100) if total_errors > 0 else 0
# Critical/unresolved counts for quick stats
critical_count = base_queryset.filter(severity="critical").count()
unresolved_count = base_queryset.filter(resolved=False).count()
return {
"total_errors": total_errors,
"errors_by_severity": severity_counts,
"errors_by_source": source_counts,
"errors_by_type": error_types,
"errors_over_time": errors_over_time,
"resolution_rate": round(resolution_rate, 1),
"critical_count": critical_count,
"unresolved_count": unresolved_count,
"period_days": days,
}
@staticmethod
def resolve_error(
error_id: UUID | int,
user: Any,
notes: str = "",
) -> ApplicationError:
"""
Mark an error as resolved.
Args:
error_id: UUID or database ID of the error
user: User marking the error as resolved
notes: Optional resolution notes
Returns:
The updated ApplicationError instance
Raises:
ApplicationError.DoesNotExist: If error not found
"""
if isinstance(error_id, int):
error = ApplicationError.objects.get(id=error_id)
else:
error = ApplicationError.objects.get(error_id=error_id)
error.resolved = True
error.resolved_at = timezone.now()
error.resolved_by = user
error.resolution_notes = notes
error.save(update_fields=["resolved", "resolved_at", "resolved_by", "resolution_notes"])
logger.info(
f"Error {error.short_error_id} resolved by {user}"
)
return error
@staticmethod
def cleanup_old_errors(days: int = 30) -> int:
"""
Delete errors older than specified days.
Args:
days: Delete errors older than this many days
Returns:
Number of errors deleted
"""
cutoff = timezone.now() - timedelta(days=days)
# Only delete resolved errors by default, keep unresolved critical
deleted_count, _ = ApplicationError.objects.filter(
Q(created_at__lt=cutoff) & (Q(resolved=True) | ~Q(severity="critical"))
).delete()
logger.info(
f"Cleaned up {deleted_count} errors older than {days} days"
)
return deleted_count
@staticmethod
def _get_client_ip(request: HttpRequest) -> str:
"""Extract client IP from request, handling proxies."""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
return x_forwarded_for.split(",")[0].strip()
return request.META.get("REMOTE_ADDR", "")

View File

@@ -20,6 +20,7 @@ from django.db.models import Q
from django.utils import timezone
from apps.core.analytics import PageView
from apps.core.utils import capture_and_log
from apps.parks.models import Park
from apps.rides.models import Ride
@@ -105,7 +106,7 @@ class TrendingService:
return formatted_results
except Exception as e:
self.logger.error(f"Error getting trending content: {e}", exc_info=True)
capture_and_log(e, f'Get trending content ({content_type})', source='service')
return []
def get_new_content(
@@ -164,7 +165,7 @@ class TrendingService:
return formatted_results
except Exception as e:
self.logger.error(f"Error getting new content: {e}", exc_info=True)
capture_and_log(e, f'Get new content ({content_type})', source='service')
return []
def _calculate_trending_parks(self, limit: int) -> list[dict[str, Any]]:
@@ -311,7 +312,7 @@ class TrendingService:
return final_score
except Exception as e:
self.logger.error(f"Error calculating score for {content_type} {content_obj.id}: {e}")
capture_and_log(e, f'Calculate content score ({content_type} {content_obj.id})', source='service', severity='low')
return 0.0
def _calculate_view_growth_score(self, content_type: ContentType, object_id: int) -> float:
@@ -653,7 +654,7 @@ class TrendingService:
self.logger.info(f"Cleared trending caches for {content_type}")
except Exception as e:
self.logger.error(f"Error clearing cache: {e}")
capture_and_log(e, f'Clear trending cache ({content_type})', source='service', severity='low')
# Singleton service instance

View File

@@ -16,6 +16,7 @@ from django.db.models import Q
from django.utils import timezone
from apps.core.analytics import PageView
from apps.core.utils import capture_and_log
from apps.parks.models import Park
from apps.rides.models import Ride
@@ -87,7 +88,7 @@ def calculate_trending_content(self, content_type: str = "all", limit: int = 50)
}
except Exception as e:
logger.error(f"Error calculating trending content: {e}", exc_info=True)
capture_and_log(e, f'Calculate trending content ({content_type})', source='task')
# Retry the task
raise self.retry(exc=e) from None
@@ -141,7 +142,7 @@ def calculate_new_content(self, content_type: str = "all", days_back: int = 30,
}
except Exception as e:
logger.error(f"Error calculating new content: {e}", exc_info=True)
capture_and_log(e, f'Calculate new content ({content_type})', source='task')
raise self.retry(exc=e) from None
@@ -185,7 +186,7 @@ def warm_trending_cache(self) -> dict[str, Any]:
}
except Exception as e:
logger.error(f"Error warming trending cache: {e}", exc_info=True)
capture_and_log(e, 'Warm trending cache', source='task')
return {
"success": False,
"error": str(e),
@@ -309,7 +310,7 @@ def _calculate_content_score(
return final_score
except Exception as e:
logger.error(f"Error calculating score for {content_type} {content_obj.id}: {e}")
capture_and_log(e, f'Calculate content score ({content_type} {content_obj.id})', source='task', severity='low')
return 0.0

View File

@@ -43,4 +43,6 @@ urlpatterns = [
path("entities/", include(entity_patterns)),
# FSM transition endpoints
path("fsm/", include(fsm_patterns)),
# Error monitoring endpoints (API)
path("errors/", include("apps.core.urls.errors", namespace="errors")),
]

View File

@@ -0,0 +1,27 @@
"""
URL configuration for error monitoring API.
"""
from django.urls import path
from apps.core.api.error_views import (
ErrorCleanupView,
ErrorDetailView,
ErrorListView,
ErrorReportView,
ErrorResolveView,
ErrorStatisticsView,
)
app_name = "errors"
urlpatterns = [
# Public endpoint (rate-limited)
path("report/", ErrorReportView.as_view(), name="report"),
# Admin endpoints
path("", ErrorListView.as_view(), name="list"),
path("statistics/", ErrorStatisticsView.as_view(), name="statistics"),
path("cleanup/", ErrorCleanupView.as_view(), name="cleanup"),
path("<int:pk>/", ErrorDetailView.as_view(), name="detail"),
path("<int:pk>/resolve/", ErrorResolveView.as_view(), name="resolve"),
]

View File

@@ -12,6 +12,11 @@ from .breadcrumbs import (
build_breadcrumb,
get_model_breadcrumb,
)
from .capture_errors import (
capture_and_log,
capture_errors,
error_context,
)
from .messages import (
confirm_delete,
error_network,
@@ -47,6 +52,10 @@ __all__ = [
"breadcrumbs_to_schema",
"build_breadcrumb",
"get_model_breadcrumb",
# Error Capture
"capture_and_log",
"capture_errors",
"error_context",
# Messages
"confirm_delete",
"error_network",
@@ -73,3 +82,4 @@ __all__ = [
"get_og_image",
"get_twitter_card_type",
]

View File

@@ -0,0 +1,219 @@
"""
Error capture utilities: decorator and context manager.
Provides ergonomic wrappers around ErrorService for easy error capture.
Example usage:
# Decorator for functions/views
@capture_errors(source='api', severity='high')
def risky_view(request):
...
# Context manager for code blocks
with error_context('Processing payment', severity='critical'):
process_payment()
# Context manager with request for full context
with error_context('Creating ride', request=request, entity_type='Ride'):
create_ride(data)
"""
import functools
import logging
from contextlib import contextmanager
from typing import Any, Callable, TypeVar
from django.http import HttpRequest
from apps.core.services import ErrorService
logger = logging.getLogger(__name__)
F = TypeVar('F', bound=Callable[..., Any])
def capture_errors(
source: str = 'backend',
severity: str = 'high',
reraise: bool = True,
log_errors: bool = True,
) -> Callable[[F], F]:
"""
Decorator that automatically captures exceptions to the error dashboard.
Use this on views, service methods, or any function where you want
automatic error tracking.
Args:
source: Error source - 'frontend', 'backend', or 'api'
severity: Default severity - 'critical', 'high', 'medium', 'low'
reraise: Whether to re-raise the exception after capturing
log_errors: Whether to also log to Python logger
Returns:
Decorated function
Example:
@capture_errors(source='api', severity='high')
def create_park(request, data):
# If this raises, error is automatically captured
return ParkService.create(data)
@capture_errors(severity='critical', reraise=False)
def optional_cleanup():
# Errors captured but swallowed
cleanup_temp_files()
"""
def decorator(func: F) -> F:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# Try to extract request from args (common in Django views)
request = None
for arg in args:
if isinstance(arg, HttpRequest):
request = arg
break
if not request:
request = kwargs.get('request')
try:
return func(*args, **kwargs)
except Exception as e:
if log_errors:
logger.exception(f"Error in {func.__name__}: {e}")
# Capture to error dashboard
try:
ErrorService.capture_error(
error=e,
source=source,
request=request,
severity=severity,
metadata={
'function_name': func.__name__,
'module': func.__module__,
},
)
except Exception as capture_error:
logger.error(f"Failed to capture error: {capture_error}")
if reraise:
raise
return wrapper # type: ignore
return decorator
@contextmanager
def error_context(
action: str,
source: str = 'backend',
severity: str = 'high',
request: HttpRequest | None = None,
entity_type: str | None = None,
entity_id: int | str | None = None,
reraise: bool = True,
metadata: dict | None = None,
):
"""
Context manager for capturing errors in code blocks.
Use this when you want to capture errors from a specific block of code
with rich context about what was happening.
Args:
action: Description of what the code block is doing
source: Error source - 'frontend', 'backend', or 'api'
severity: Error severity level
request: Optional HTTP request for context
entity_type: Optional entity type being operated on (e.g., 'Ride', 'Park')
entity_id: Optional entity ID being operated on
reraise: Whether to re-raise the exception after capturing
metadata: Additional metadata to include
Yields:
None
Example:
with error_context('Creating ride submission', request=request, entity_type='Ride'):
submission = SubmissionService.create(data)
with error_context('Bulk import', severity='critical', reraise=False):
for item in items:
process_item(item) # Errors logged but processing continues
"""
try:
yield
except Exception as e:
logger.exception(f"Error during '{action}': {e}")
# Build metadata
error_metadata = {
'action': action,
**(metadata or {}),
}
if entity_type:
error_metadata['entity_type'] = entity_type
if entity_id:
error_metadata['entity_id'] = entity_id
# Capture to error dashboard
try:
ErrorService.capture_error(
error=e,
source=source,
request=request,
severity=severity,
metadata=error_metadata,
)
except Exception as capture_error:
logger.error(f"Failed to capture error: {capture_error}")
if reraise:
raise
def capture_and_log(
error: Exception,
action: str,
source: str = 'backend',
severity: str = 'medium',
request: HttpRequest | None = None,
**kwargs: Any,
) -> str:
"""
One-liner function to capture an error and return its ID.
Use this when you've already caught an exception and want to
report it without the decorator/context manager.
Args:
error: The exception to capture
action: Description of what was happening
source: Error source
severity: Error severity level
request: Optional HTTP request for context
**kwargs: Additional metadata fields
Returns:
The error_id (short UUID) for reference
Example:
try:
result = risky_operation()
except Exception as e:
error_id = capture_and_log(e, 'Risky operation failed', severity='high')
return Response({'error': f'Failed (ref: {error_id})'}, status=500)
"""
try:
app_error = ErrorService.capture_error(
error=error,
source=source,
request=request,
severity=severity,
metadata={'action': action, **kwargs},
)
return app_error.short_error_id
except Exception as capture_error:
logger.error(f"Failed to capture error: {capture_error}")
return "unknown"

View File

@@ -4,6 +4,8 @@ import requests
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from .capture_errors import capture_and_log
logger = logging.getLogger(__name__)
@@ -47,7 +49,9 @@ def get_direct_upload_url(user_id=None):
if not result.get("success"):
error_msg = result.get("errors", [{"message": "Unknown error"}])[0].get("message")
logger.error(f"Cloudflare Direct Upload Error: {error_msg}")
raise requests.RequestException(f"Cloudflare Error: {error_msg}")
# Create error for capture
e = requests.RequestException(f"Cloudflare Error: {error_msg}")
capture_and_log(e, 'Cloudflare direct upload', source='service')
raise e
return result.get("result", {})

View File

@@ -18,6 +18,7 @@ from django.views.decorators.gzip import gzip_page
from ..services.data_structures import GeoBounds, LocationType, MapFilters
from ..services.map_service import unified_map_service
from apps.core.utils import capture_and_log
logger = logging.getLogger(__name__)
@@ -51,10 +52,7 @@ class MapAPIView(View):
return response
except Exception as e:
logger.error(
f"API error in {request.path}: {str(e)}",
exc_info=True,
)
capture_and_log(e, f'Map API dispatch {request.path}', source='api')
return self._error_response("An internal server error occurred", status=500)
def options(self, request, *args, **kwargs):
@@ -373,7 +371,7 @@ class MapLocationsView(MapAPIView):
logger.warning(f"Validation error in MapLocationsView: {str(e)}")
return self._error_response(str(e), 400, error_code="VALIDATION_ERROR")
except Exception as e:
logger.error(f"Error in MapLocationsView: {str(e)}", exc_info=True)
capture_and_log(e, 'MapLocationsView get', source='api')
return self._error_response(
"Failed to retrieve map locations",
500,
@@ -433,10 +431,7 @@ class MapLocationDetailView(MapAPIView):
logger.warning(f"Value error in MapLocationDetailView: {str(e)}")
return self._error_response(str(e), 400, error_code="INVALID_PARAMETER")
except Exception as e:
logger.error(
f"Error in MapLocationDetailView: {str(e)}",
exc_info=True,
)
capture_and_log(e, 'MapLocationDetailView get', source='api')
return self._error_response(
"Failed to retrieve location details",
500,
@@ -529,7 +524,7 @@ class MapSearchView(MapAPIView):
logger.warning(f"Value error in MapSearchView: {str(e)}")
return self._error_response(str(e), 400, error_code="INVALID_PARAMETER")
except Exception as e:
logger.error(f"Error in MapSearchView: {str(e)}", exc_info=True)
capture_and_log(e, 'MapSearchView get', source='api')
return self._error_response(
"Search failed due to internal error",
500,