Files
thrillwiki_django_no_react/backend/apps/api/v1/views/admin.py

383 lines
14 KiB
Python

"""
Admin API views for data completeness and system management.
These views provide admin-only endpoints for analyzing data quality,
entity completeness, and system health.
"""
from drf_spectacular.utils import extend_schema
from rest_framework import status
from apps.core.permissions import IsAdminWithSecondFactor
from rest_framework.response import Response
from rest_framework.views import APIView
from apps.core.decorators.cache_decorators import cache_api_response
from apps.parks.models import Park
from apps.rides.models import Company, Ride
# Define field importance categories per entity type
PARK_FIELDS = {
"critical": ["name", "slug", "status"],
"important": ["short_description", "park_type", "opening_date"],
"valuable": ["banner_image_url", "card_image_url", "website_url", "phone", "email"],
"supplementary": ["closing_date", "size_acres", "operating_season"],
}
RIDE_FIELDS = {
"critical": ["name", "slug", "status", "park_id"],
"important": ["category", "opening_date", "manufacturer_id"],
"valuable": [
"max_speed_kmh",
"height_meters",
"track_length_meters",
"inversions_count",
"banner_image_url",
"card_image_url",
],
"supplementary": [
"min_height_cm",
"max_height_cm",
"duration_seconds",
"capacity_per_hour",
"designer_id",
],
}
COMPANY_FIELDS = {
"critical": ["name", "slug", "company_type"],
"important": ["description", "headquarters_location"],
"valuable": ["logo_url", "website_url", "founded_year"],
"supplementary": ["banner_image_url", "card_image_url"],
}
def calculate_completeness_score(obj, fields_config: dict) -> tuple[int, dict]:
"""
Calculate completeness score for an entity based on field importance.
Returns:
Tuple of (score, missing_fields_dict)
"""
weights = {"critical": 40, "important": 30, "valuable": 20, "supplementary": 10}
max_score = 100
score = 0
missing_fields = {}
for category, fields in fields_config.items():
category_weight = weights[category]
field_weight = category_weight / len(fields) if fields else 0
missing_in_category = []
for field in fields:
value = getattr(obj, field, None)
if value is not None and value != "" and value != []:
score += field_weight
else:
missing_in_category.append(field)
if missing_in_category:
missing_fields[category] = missing_in_category
return min(round(score), max_score), missing_fields
class DataCompletenessAPIView(APIView):
"""
Admin endpoint for analyzing data completeness across all entity types.
Returns completeness scores and missing field analysis for parks, rides,
companies, and ride models.
"""
permission_classes = [IsAdminWithSecondFactor]
@extend_schema(
tags=["Admin"],
summary="Get data completeness analysis",
description="Analyze data completeness across all entity types with missing field breakdown",
)
@cache_api_response(timeout=300, key_prefix="data_completeness")
def get(self, request):
"""
Get data completeness analysis.
Query parameters:
- entity_type: Filter by entity type (park, ride, company, ride_model)
- min_score: Minimum completeness score (0-100)
- max_score: Maximum completeness score (0-100)
- missing_category: Filter by missing field category (critical, important, valuable, supplementary)
- limit: Max results per entity type (default 50)
"""
try:
entity_type = request.GET.get("entity_type")
min_score = request.GET.get("min_score")
max_score = request.GET.get("max_score")
missing_category = request.GET.get("missing_category")
limit = min(int(request.GET.get("limit", 50)), 200)
results = {
"summary": {},
"parks": [],
"rides": [],
"companies": [],
"ride_models": [],
}
# Process parks
if not entity_type or entity_type == "park":
parks = Park.objects.all()[:limit]
park_results = []
total_park_score = 0
parks_complete = 0
for park in parks:
score, missing = calculate_completeness_score(park, PARK_FIELDS)
# Apply filters
if min_score and score < int(min_score):
continue
if max_score and score > int(max_score):
continue
if missing_category and missing_category not in missing:
continue
total_park_score += score
if score == 100:
parks_complete += 1
park_results.append({
"id": str(park.id),
"name": park.name,
"slug": park.slug,
"entity_type": "park",
"updated_at": park.updated_at.isoformat() if hasattr(park, "updated_at") else None,
"completeness_score": score,
"missing_fields": missing,
})
results["parks"] = park_results
results["summary"]["total_parks"] = len(park_results)
results["summary"]["avg_park_score"] = (
round(total_park_score / len(park_results)) if park_results else 0
)
results["summary"]["parks_complete"] = parks_complete
# Process rides
if not entity_type or entity_type == "ride":
rides = Ride.objects.select_related("park").all()[:limit]
ride_results = []
total_ride_score = 0
rides_complete = 0
for ride in rides:
score, missing = calculate_completeness_score(ride, RIDE_FIELDS)
if min_score and score < int(min_score):
continue
if max_score and score > int(max_score):
continue
if missing_category and missing_category not in missing:
continue
total_ride_score += score
if score == 100:
rides_complete += 1
ride_results.append({
"id": str(ride.id),
"name": ride.name,
"slug": ride.slug,
"entity_type": "ride",
"updated_at": ride.updated_at.isoformat() if hasattr(ride, "updated_at") else None,
"completeness_score": score,
"missing_fields": missing,
})
results["rides"] = ride_results
results["summary"]["total_rides"] = len(ride_results)
results["summary"]["avg_ride_score"] = (
round(total_ride_score / len(ride_results)) if ride_results else 0
)
results["summary"]["rides_complete"] = rides_complete
# Process companies
if not entity_type or entity_type == "company":
companies = Company.objects.all()[:limit]
company_results = []
total_company_score = 0
companies_complete = 0
for company in companies:
score, missing = calculate_completeness_score(company, COMPANY_FIELDS)
if min_score and score < int(min_score):
continue
if max_score and score > int(max_score):
continue
if missing_category and missing_category not in missing:
continue
total_company_score += score
if score == 100:
companies_complete += 1
company_results.append({
"id": str(company.id),
"name": company.name,
"slug": company.slug,
"entity_type": "company",
"updated_at": company.updated_at.isoformat() if hasattr(company, "updated_at") else None,
"completeness_score": score,
"missing_fields": missing,
})
results["companies"] = company_results
results["summary"]["total_companies"] = len(company_results)
results["summary"]["avg_company_score"] = (
round(total_company_score / len(company_results)) if company_results else 0
)
results["summary"]["companies_complete"] = companies_complete
# Ride models - placeholder (if model exists)
results["summary"]["total_models"] = 0
results["summary"]["avg_model_score"] = 0
results["summary"]["models_complete"] = 0
return Response(results, status=status.HTTP_200_OK)
except Exception as e:
return Response(
{"detail": f"Error analyzing data completeness: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class TechnicalSpecificationsAPIView(APIView):
"""
Endpoint for querying ride technical specifications.
Used by advanced ride search functionality.
"""
permission_classes = [] # Public endpoint
@extend_schema(
tags=["Rides"],
summary="Get ride technical specifications",
description="Query technical specifications across rides for advanced filtering",
)
def get(self, request):
"""
Get technical specifications for rides.
Query parameters:
- spec_name: Filter by specification name
- ride_id: Filter by specific ride
"""
try:
spec_name = request.GET.get("spec_name")
ride_id = request.GET.get("ride_id")
# For now, return technical specs from ride fields
# In a full implementation, this would query a separate specs table
rides = Ride.objects.all()
if ride_id:
rides = rides.filter(id=ride_id)
specs = []
spec_fields = [
("max_speed_kmh", "Max Speed (km/h)"),
("height_meters", "Height (m)"),
("track_length_meters", "Track Length (m)"),
("inversions_count", "Inversions"),
("duration_seconds", "Duration (s)"),
("g_force", "G-Force"),
]
for ride in rides[:100]: # Limit to prevent huge responses
for field, _name in spec_fields:
value = getattr(ride, field, None)
if value is not None and (not spec_name or spec_name == field):
specs.append({
"ride_id": str(ride.id),
"spec_name": field,
"spec_value": str(value),
})
return Response(specs, status=status.HTTP_200_OK)
except Exception as e:
return Response(
{"detail": f"Error fetching specifications: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class CoasterStatisticsAPIView(APIView):
"""
Endpoint for querying coaster statistics for advanced filtering.
"""
permission_classes = [] # Public endpoint
@extend_schema(
tags=["Rides"],
summary="Get coaster statistics",
description="Query coaster statistics for advanced ride filtering",
)
def get(self, request):
"""
Get coaster statistics.
Query parameters:
- stat_name: Filter by statistic name
- stat_value__gte: Minimum value
- stat_value__lte: Maximum value
"""
try:
stat_name = request.GET.get("stat_name")
min_value = request.GET.get("stat_value__gte")
max_value = request.GET.get("stat_value__lte")
# Query rides with coaster category and relevant stats
rides = Ride.objects.filter(category="coaster")
stats = []
stat_fields = [
"max_speed_kmh",
"height_meters",
"track_length_meters",
"inversions_count",
"g_force",
"drop_height_meters",
]
for ride in rides[:100]:
for field in stat_fields:
if stat_name and stat_name != field:
continue
value = getattr(ride, field, None)
if value is None:
continue
# Apply value filters
if min_value and float(value) < float(min_value):
continue
if max_value and float(value) > float(max_value):
continue
stats.append({
"ride_id": str(ride.id),
"stat_name": field,
"stat_value": float(value),
})
return Response(stats, status=status.HTTP_200_OK)
except Exception as e:
return Response(
{"detail": f"Error fetching statistics: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)