Files

1629 lines
63 KiB
Python

"""
Centralized map API views.
Migrated from apps.core.views.map_views
Caching Strategy:
- MapLocationsAPIView: 5 minutes (300s) - map data changes infrequently but needs freshness
- MapLocationDetailAPIView: 30 minutes (1800s) - detail views are stable
- MapSearchAPIView: 5 minutes (300s) - search results should be consistent
- MapBoundsAPIView: 5 minutes (300s) - bounds queries are location-specific
- MapStatsAPIView: 10 minutes (600s) - stats are aggregated and change slowly
"""
import logging
from django.contrib.gis.geos import Polygon
from django.core.cache import cache
from django.db.models import Q
from django.http import HttpRequest
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import (
OpenApiExample,
OpenApiParameter,
extend_schema,
extend_schema_view,
)
from rest_framework import status
from rest_framework.permissions import AllowAny, IsAdminUser
from rest_framework.response import Response
from rest_framework.views import APIView
from apps.core.decorators.cache_decorators import cache_api_response
from apps.core.services.enhanced_cache_service import EnhancedCacheService
from apps.parks.models import Park
from apps.rides.models import Ride
from ..serializers.maps import (
MapLocationDetailSerializer,
MapLocationsResponseSerializer,
MapSearchResponseSerializer,
)
from apps.core.utils import capture_and_log
logger = logging.getLogger(__name__)
@extend_schema_view(
get=extend_schema(
summary="Get map locations",
description="Get map locations with optional clustering and filtering.",
parameters=[
OpenApiParameter(
"north",
type=OpenApiTypes.NUMBER,
location=OpenApiParameter.QUERY,
required=False,
description="Northern latitude bound (-90 to 90). Used with south, east, west to define geographic bounds.",
examples=[OpenApiExample("Example", value=41.5)],
),
OpenApiParameter(
"south",
type=OpenApiTypes.NUMBER,
location=OpenApiParameter.QUERY,
required=False,
description="Southern latitude bound (-90 to 90). Must be less than north bound.",
examples=[OpenApiExample("Example", value=41.4)],
),
OpenApiParameter(
"east",
type=OpenApiTypes.NUMBER,
location=OpenApiParameter.QUERY,
required=False,
description="Eastern longitude bound (-180 to 180). Must be greater than west bound.",
examples=[OpenApiExample("Example", value=-82.6)],
),
OpenApiParameter(
"west",
type=OpenApiTypes.NUMBER,
location=OpenApiParameter.QUERY,
required=False,
description="Western longitude bound (-180 to 180). Used with other bounds for geographic filtering.",
examples=[OpenApiExample("Example", value=-82.8)],
),
OpenApiParameter(
"zoom",
type=OpenApiTypes.INT,
location=OpenApiParameter.QUERY,
required=False,
description="Map zoom level (1-20). Higher values show more detail. Used for clustering decisions.",
examples=[OpenApiExample("Example", value=10)],
),
OpenApiParameter(
"types",
type=OpenApiTypes.STR,
location=OpenApiParameter.QUERY,
required=False,
description="Comma-separated location types to include. Valid values: 'park', 'ride'. Default: 'park,ride'",
examples=[
OpenApiExample("All types", value="park,ride"),
OpenApiExample("Parks only", value="park"),
OpenApiExample("Rides only", value="ride"),
],
),
OpenApiParameter(
"cluster",
type=OpenApiTypes.BOOL,
location=OpenApiParameter.QUERY,
required=False,
description="Enable location clustering for high-density areas. Default: false",
examples=[
OpenApiExample("Enable clustering", value=True),
OpenApiExample("Disable clustering", value=False),
],
),
OpenApiParameter(
"q",
type=OpenApiTypes.STR,
location=OpenApiParameter.QUERY,
required=False,
description="Text search query. Searches park/ride names, cities, and states.",
examples=[
OpenApiExample("Park name", value="Cedar Point"),
OpenApiExample("Ride type", value="roller coaster"),
OpenApiExample("Location", value="Ohio"),
],
),
],
responses={
200: MapLocationsResponseSerializer,
400: OpenApiTypes.OBJECT,
500: OpenApiTypes.OBJECT,
},
tags=["Maps"],
),
)
class MapLocationsAPIView(APIView):
"""API endpoint for getting map locations with optional clustering."""
permission_classes = [AllowAny]
def _parse_request_parameters(self, request: HttpRequest) -> dict:
"""Parse and validate request parameters."""
return {
"north": request.GET.get("north"),
"south": request.GET.get("south"),
"east": request.GET.get("east"),
"west": request.GET.get("west"),
"zoom": request.GET.get("zoom", 10),
"types": request.GET.get("types", "park,ride").split(","),
"cluster": request.GET.get("cluster", "false").lower() == "true",
"query": request.GET.get("q", "").strip(),
}
def _build_cache_key(self, params: dict) -> str:
"""Build cache key from parameters."""
return (
f"map_locations_{params['north']}_{params['south']}_"
f"{params['east']}_{params['west']}_{params['zoom']}_"
f"{','.join(params['types'])}_{params['cluster']}_{params['query']}"
)
def _create_bounds_polygon(self, north: str, south: str, east: str, west: str) -> Polygon | None:
"""Create bounds polygon from coordinate strings."""
if not all([north, south, east, west]):
return None
try:
return Polygon.from_bbox((float(west), float(south), float(east), float(north)))
except (ValueError, TypeError):
return None
def _serialize_park_location(self, park) -> dict:
"""Serialize park location data."""
location = park.location if hasattr(park, "location") and park.location else None
return {
"city": location.city if location else "",
"state": location.state if location else "",
"country": location.country if location else "",
"formatted_address": location.formatted_address if location else "",
}
def _serialize_park_data(self, park) -> dict:
"""Serialize park data for map response."""
location = park.location if hasattr(park, "location") and park.location else None
return {
"id": park.id,
"type": "park",
"name": park.name,
"slug": park.slug,
"latitude": location.latitude if location else None,
"longitude": location.longitude if location else None,
"status": park.status,
"location": self._serialize_park_location(park),
"stats": {
"coaster_count": park.coaster_count or 0,
"ride_count": park.ride_count or 0,
"average_rating": (float(park.average_rating) if park.average_rating else None),
},
}
def _get_parks_data(self, params: dict) -> list:
"""Get and serialize parks data."""
if "park" not in params["types"]:
return []
parks_query = Park.objects.select_related("location", "operator").filter(location__point__isnull=False)
# Apply bounds filtering
bounds_polygon = self._create_bounds_polygon(params["north"], params["south"], params["east"], params["west"])
if bounds_polygon:
parks_query = parks_query.filter(location__point__within=bounds_polygon)
# Apply text search
if params["query"]:
parks_query = parks_query.filter(
Q(name__icontains=params["query"])
| Q(location__city__icontains=params["query"])
| Q(location__state__icontains=params["query"])
)
return [self._serialize_park_data(park) for park in parks_query[:100]]
def _serialize_ride_location(self, ride) -> dict:
"""Serialize ride location data."""
location = ride.park.location if hasattr(ride.park, "location") and ride.park.location else None
return {
"city": location.city if location else "",
"state": location.state if location else "",
"country": location.country if location else "",
"formatted_address": location.formatted_address if location else "",
}
def _serialize_ride_data(self, ride) -> dict:
"""Serialize ride data for map response."""
location = ride.park.location if hasattr(ride.park, "location") and ride.park.location else None
return {
"id": ride.id,
"type": "ride",
"name": ride.name,
"slug": ride.slug,
"latitude": location.latitude if location else None,
"longitude": location.longitude if location else None,
"status": ride.status,
"location": self._serialize_ride_location(ride),
"stats": {
"category": ride.get_category_display() if ride.category else None,
"average_rating": (float(ride.average_rating) if ride.average_rating else None),
"park_name": ride.park.name,
},
}
def _get_rides_data(self, params: dict) -> list:
"""Get and serialize rides data."""
if "ride" not in params["types"]:
return []
rides_query = Ride.objects.select_related("park__location", "manufacturer").filter(
park__location__point__isnull=False
)
# Apply bounds filtering
bounds_polygon = self._create_bounds_polygon(params["north"], params["south"], params["east"], params["west"])
if bounds_polygon:
rides_query = rides_query.filter(park__location__point__within=bounds_polygon)
# Apply text search
if params["query"]:
rides_query = rides_query.filter(
Q(name__icontains=params["query"])
| Q(park__name__icontains=params["query"])
| Q(park__location__city__icontains=params["query"])
)
return [self._serialize_ride_data(ride) for ride in rides_query[:100]]
def _calculate_bounds(self, locations: list) -> dict:
"""Calculate bounds from location results."""
if not locations:
return {}
lats = [loc["latitude"] for loc in locations if loc["latitude"]]
lngs = [loc["longitude"] for loc in locations if loc["longitude"]]
if not lats or not lngs:
return {}
return {
"north": max(lats),
"south": min(lats),
"east": max(lngs),
"west": min(lngs),
}
def _build_response(self, locations: list, params: dict) -> dict:
"""Build the final response data."""
return {
"status": "success",
"locations": locations,
"clusters": [], # See FUTURE_WORK.md - THRILLWIKI-106 for implementation plan
"bounds": self._calculate_bounds(locations),
"total_count": len(locations),
"clustered": params["cluster"],
}
def get(self, request: HttpRequest) -> Response:
"""
Get map locations with optional clustering and filtering.
Caching: Uses EnhancedCacheService with 5-minute timeout (300s).
Cache key is based on all query parameters for proper invalidation.
"""
try:
params = self._parse_request_parameters(request)
cache_key = self._build_cache_key(params)
# Use EnhancedCacheService for improved caching with monitoring
cache_service = EnhancedCacheService()
cached_result = cache_service.get_cached_api_response("map_locations", params)
if cached_result:
logger.debug(f"Cache hit for map_locations with key: {cache_key}")
return Response(cached_result)
# Get location data
parks_data = self._get_parks_data(params)
rides_data = self._get_rides_data(params)
locations = parks_data + rides_data
# Build response
result = self._build_response(locations, params)
# Cache result for 5 minutes using EnhancedCacheService
cache_service.cache_api_response("map_locations", params, result, timeout=300)
logger.debug(f"Cached map_locations result for key: {cache_key}")
return Response(result)
except Exception as e:
capture_and_log(e, 'Get map locations', source='api')
return Response(
{"status": "error", "detail": "Failed to retrieve map locations"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema_view(
get=extend_schema(
summary="Get location details",
description="Get detailed information about a specific location.",
parameters=[
OpenApiParameter(
"location_type",
type=OpenApiTypes.STR,
location=OpenApiParameter.PATH,
required=True,
description="Type of location",
),
OpenApiParameter(
"location_id",
type=OpenApiTypes.INT,
location=OpenApiParameter.PATH,
required=True,
description="ID of the location",
),
],
responses={
200: MapLocationDetailSerializer,
400: OpenApiTypes.OBJECT,
404: OpenApiTypes.OBJECT,
500: OpenApiTypes.OBJECT,
},
tags=["Maps"],
),
)
class MapLocationDetailAPIView(APIView):
"""
API endpoint for getting detailed information about a specific location.
Caching: 30-minute timeout (1800s) - detail views are stable and change infrequently.
"""
permission_classes = [AllowAny]
@cache_api_response(timeout=1800, key_prefix="map_detail")
def get(self, request: HttpRequest, location_type: str, location_id: int) -> Response:
"""Get detailed information for a specific location."""
try:
if location_type == "park":
try:
obj = Park.objects.select_related("location", "operator").get(id=location_id)
except Park.DoesNotExist:
return Response(
{"status": "error", "detail": "Park not found"},
status=status.HTTP_404_NOT_FOUND,
)
elif location_type == "ride":
try:
obj = Ride.objects.select_related("park__location", "manufacturer").get(id=location_id)
except Ride.DoesNotExist:
return Response(
{"status": "error", "detail": "Ride not found"},
status=status.HTTP_404_NOT_FOUND,
)
else:
return Response(
{"status": "error", "detail": "Invalid location type"},
status=status.HTTP_400_BAD_REQUEST,
)
# Serialize the object
if location_type == "park":
data = {
"id": obj.id,
"type": "park",
"name": obj.name,
"slug": obj.slug,
"description": obj.description,
"latitude": (obj.location.latitude if hasattr(obj, "location") and obj.location else None),
"longitude": (obj.location.longitude if hasattr(obj, "location") and obj.location else None),
"status": obj.status,
"location": {
"street_address": (
obj.location.street_address if hasattr(obj, "location") and obj.location else ""
),
"city": (obj.location.city if hasattr(obj, "location") and obj.location else ""),
"state": (obj.location.state if hasattr(obj, "location") and obj.location else ""),
"country": (obj.location.country if hasattr(obj, "location") and obj.location else ""),
"postal_code": (obj.location.postal_code if hasattr(obj, "location") and obj.location else ""),
"formatted_address": (
obj.location.formatted_address if hasattr(obj, "location") and obj.location else ""
),
},
"stats": {
"coaster_count": obj.coaster_count or 0,
"ride_count": obj.ride_count or 0,
"average_rating": (float(obj.average_rating) if obj.average_rating else None),
"size_acres": float(obj.size_acres) if obj.size_acres else None,
"opening_date": (obj.opening_date.isoformat() if obj.opening_date else None),
},
"nearby_locations": [], # See FUTURE_WORK.md - THRILLWIKI-107
}
else: # ride
data = {
"id": obj.id,
"type": "ride",
"name": obj.name,
"slug": obj.slug,
"description": obj.description,
"latitude": (
obj.park.location.latitude if hasattr(obj.park, "location") and obj.park.location else None
),
"longitude": (
obj.park.location.longitude if hasattr(obj.park, "location") and obj.park.location else None
),
"status": obj.status,
"location": {
"street_address": (
obj.park.location.street_address
if hasattr(obj.park, "location") and obj.park.location
else ""
),
"city": (obj.park.location.city if hasattr(obj.park, "location") and obj.park.location else ""),
"state": (
obj.park.location.state if hasattr(obj.park, "location") and obj.park.location else ""
),
"country": (
obj.park.location.country if hasattr(obj.park, "location") and obj.park.location else ""
),
"postal_code": (
obj.park.location.postal_code if hasattr(obj.park, "location") and obj.park.location else ""
),
"formatted_address": (
obj.park.location.formatted_address
if hasattr(obj.park, "location") and obj.park.location
else ""
),
},
"stats": {
"category": (obj.get_category_display() if obj.category else None),
"average_rating": (float(obj.average_rating) if obj.average_rating else None),
"park_name": obj.park.name,
"opening_date": (obj.opening_date.isoformat() if obj.opening_date else None),
"manufacturer": (obj.manufacturer.name if obj.manufacturer else None),
},
"nearby_locations": [], # See FUTURE_WORK.md - THRILLWIKI-107
}
return Response(
{
"status": "success",
"data": data,
}
)
except Exception as e:
capture_and_log(e, 'Get map location detail', source='api')
return Response(
{"status": "error", "detail": "Failed to retrieve location details"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema_view(
get=extend_schema(
summary="Search map locations",
description="Search locations by text query with optional bounds filtering.",
parameters=[
OpenApiParameter(
"q",
type=OpenApiTypes.STR,
location=OpenApiParameter.QUERY,
required=True,
description="Search query",
),
OpenApiParameter(
"types",
type=OpenApiTypes.STR,
location=OpenApiParameter.QUERY,
required=False,
description="Comma-separated location types (park,ride)",
),
OpenApiParameter(
"page",
type=OpenApiTypes.INT,
location=OpenApiParameter.QUERY,
required=False,
description="Page number",
),
OpenApiParameter(
"page_size",
type=OpenApiTypes.INT,
location=OpenApiParameter.QUERY,
required=False,
description="Results per page",
),
],
responses={
200: MapSearchResponseSerializer,
400: OpenApiTypes.OBJECT,
500: OpenApiTypes.OBJECT,
},
tags=["Maps"],
),
)
class MapSearchAPIView(APIView):
"""
API endpoint for searching locations by text query.
Caching: 5-minute timeout (300s) - search results should remain consistent
but need to reflect new content additions.
"""
permission_classes = [AllowAny]
@cache_api_response(timeout=300, key_prefix="map_search")
def get(self, request: HttpRequest) -> Response:
"""Search locations by text query with pagination."""
try:
query = request.GET.get("q", "").strip()
if not query:
return Response(
{
"status": "error",
"detail": "Search query 'q' parameter is required",
},
status=status.HTTP_400_BAD_REQUEST,
)
types = request.GET.get("types", "park,ride").split(",")
page = int(request.GET.get("page", 1))
page_size = min(int(request.GET.get("page_size", 20)), 100)
results = []
total_count = 0
# Search parks
if "park" in types:
parks_query = (
Park.objects.select_related("location")
.filter(
Q(name__icontains=query)
| Q(location__city__icontains=query)
| Q(location__state__icontains=query)
)
.filter(location__point__isnull=False)
)
for park in parks_query[:50]: # Limit results
results.append(
{
"id": park.id,
"type": "park",
"name": park.name,
"slug": park.slug,
"latitude": (
park.location.latitude if hasattr(park, "location") and park.location else None
),
"longitude": (
park.location.longitude if hasattr(park, "location") and park.location else None
),
"location": {
"city": (park.location.city if hasattr(park, "location") and park.location else ""),
"state": (park.location.state if hasattr(park, "location") and park.location else ""),
"country": (
park.location.country if hasattr(park, "location") and park.location else ""
),
},
"relevance_score": 1.0, # See FUTURE_WORK.md - THRILLWIKI-108
}
)
# Search rides
if "ride" in types:
rides_query = (
Ride.objects.select_related("park__location")
.filter(
Q(name__icontains=query)
| Q(park__name__icontains=query)
| Q(park__location__city__icontains=query)
)
.filter(park__location__point__isnull=False)
)
for ride in rides_query[:50]: # Limit results
results.append(
{
"id": ride.id,
"type": "ride",
"name": ride.name,
"slug": ride.slug,
"latitude": (
ride.park.location.latitude
if hasattr(ride.park, "location") and ride.park.location
else None
),
"longitude": (
ride.park.location.longitude
if hasattr(ride.park, "location") and ride.park.location
else None
),
"location": {
"city": (
ride.park.location.city
if hasattr(ride.park, "location") and ride.park.location
else ""
),
"state": (
ride.park.location.state
if hasattr(ride.park, "location") and ride.park.location
else ""
),
"country": (
ride.park.location.country
if hasattr(ride.park, "location") and ride.park.location
else ""
),
},
"relevance_score": 1.0, # See FUTURE_WORK.md - THRILLWIKI-108
}
)
total_count = len(results)
# Apply pagination
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
paginated_results = results[start_idx:end_idx]
return Response(
{
"status": "success",
"results": paginated_results,
"query": query,
"total_count": total_count,
"page": page,
"page_size": page_size,
}
)
except Exception as e:
capture_and_log(e, 'Map search', source='api')
return Response(
{"status": "error", "detail": "Search failed due to internal error"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema_view(
get=extend_schema(
summary="Get locations within bounds",
description="Get locations within specific geographic bounds.",
parameters=[
OpenApiParameter(
"north",
type=OpenApiTypes.NUMBER,
location=OpenApiParameter.QUERY,
required=True,
description="Northern latitude bound",
),
OpenApiParameter(
"south",
type=OpenApiTypes.NUMBER,
location=OpenApiParameter.QUERY,
required=True,
description="Southern latitude bound",
),
OpenApiParameter(
"east",
type=OpenApiTypes.NUMBER,
location=OpenApiParameter.QUERY,
required=True,
description="Eastern longitude bound",
),
OpenApiParameter(
"west",
type=OpenApiTypes.NUMBER,
location=OpenApiParameter.QUERY,
required=True,
description="Western longitude bound",
),
OpenApiParameter(
"types",
type=OpenApiTypes.STR,
location=OpenApiParameter.QUERY,
required=False,
description="Comma-separated location types (park,ride)",
),
],
responses={200: OpenApiTypes.OBJECT, 400: OpenApiTypes.OBJECT},
tags=["Maps"],
),
)
class MapBoundsAPIView(APIView):
"""
API endpoint for getting locations within specific bounds.
Caching: 5-minute timeout (300s) - bounds queries are location-specific
and may be repeated during map navigation.
"""
permission_classes = [AllowAny]
@cache_api_response(timeout=300, key_prefix="map_bounds")
def get(self, request: HttpRequest) -> Response:
"""Get locations within specific geographic bounds."""
try:
# Parse required bounds parameters
north_str = request.GET.get("north")
south_str = request.GET.get("south")
east_str = request.GET.get("east")
west_str = request.GET.get("west")
if not all([north_str, south_str, east_str, west_str]):
return Response(
{"status": "error", "detail": "All bounds parameters (north, south, east, west) are required"},
status=status.HTTP_400_BAD_REQUEST,
)
try:
north = float(north_str) if north_str else 0.0
south = float(south_str) if south_str else 0.0
east = float(east_str) if east_str else 0.0
west = float(west_str) if west_str else 0.0
except (TypeError, ValueError):
return Response(
{"status": "error", "detail": "Invalid bounds parameters"},
status=status.HTTP_400_BAD_REQUEST,
)
# Validate bounds
if north <= south:
return Response(
{
"status": "error",
"detail": "North bound must be greater than south bound",
},
status=status.HTTP_400_BAD_REQUEST,
)
if west >= east:
return Response(
{
"status": "error",
"detail": "West bound must be less than east bound",
},
status=status.HTTP_400_BAD_REQUEST,
)
types = request.GET.get("types", "park,ride").split(",")
locations = []
# Create bounds polygon
bounds_polygon = Polygon.from_bbox((west, south, east, north))
# Get parks within bounds
if "park" in types:
parks_query = Park.objects.select_related("location").filter(location__point__within=bounds_polygon)
for park in parks_query[:100]: # Limit results
locations.append(
{
"id": park.id,
"type": "park",
"name": park.name,
"slug": park.slug,
"latitude": (
park.location.latitude if hasattr(park, "location") and park.location else None
),
"longitude": (
park.location.longitude if hasattr(park, "location") and park.location else None
),
"status": park.status,
}
)
# Get rides within bounds
if "ride" in types:
rides_query = Ride.objects.select_related("park__location").filter(
park__location__point__within=bounds_polygon
)
for ride in rides_query[:100]: # Limit results
locations.append(
{
"id": ride.id,
"type": "ride",
"name": ride.name,
"slug": ride.slug,
"latitude": (
ride.park.location.latitude
if hasattr(ride.park, "location") and ride.park.location
else None
),
"longitude": (
ride.park.location.longitude
if hasattr(ride.park, "location") and ride.park.location
else None
),
"status": ride.status,
}
)
return Response(
{
"status": "success",
"locations": locations,
"bounds": {
"north": north,
"south": south,
"east": east,
"west": west,
},
"total_count": len(locations),
}
)
except Exception as e:
capture_and_log(e, 'Get map bounds', source='api')
return Response(
{"status": "error", "detail": "Failed to retrieve locations within bounds"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema_view(
get=extend_schema(
summary="Get map service statistics",
description="Get map service statistics and performance metrics.",
responses={200: OpenApiTypes.OBJECT},
tags=["Maps"],
),
)
class MapStatsAPIView(APIView):
"""
API endpoint for getting map service statistics and health information.
Caching: 10-minute timeout (600s) - stats are aggregated and change slowly.
"""
permission_classes = [AllowAny]
@cache_api_response(timeout=600, key_prefix="map_stats")
def get(self, request: HttpRequest) -> Response:
"""Get map service statistics and performance metrics."""
try:
# Count locations with coordinates
parks_with_location = Park.objects.filter(location__point__isnull=False).count()
rides_with_location = Ride.objects.filter(park__location__point__isnull=False).count()
total_locations = parks_with_location + rides_with_location
# Get cache statistics
from apps.core.services.enhanced_cache_service import CacheMonitor
cache_monitor = CacheMonitor()
cache_stats = cache_monitor.get_cache_statistics("map_locations")
return Response(
{
"status": "success",
"total_locations": total_locations,
"parks_with_location": parks_with_location,
"rides_with_location": rides_with_location,
"cache_hits": cache_stats.get("hits", 0),
"cache_misses": cache_stats.get("misses", 0),
"cache_hit_rate": cache_stats.get("hit_rate", 0.0),
"cache_size": cache_stats.get("size", 0),
}
)
except Exception as e:
capture_and_log(e, 'Get map stats', source='api')
return Response(
{"status": "error", "detail": "Failed to retrieve map statistics"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema_view(
delete=extend_schema(
summary="Clear map cache",
description="Clear all map cache (admin only).",
responses={200: OpenApiTypes.OBJECT},
tags=["Maps"],
),
post=extend_schema(
summary="Invalidate specific cache entries",
description="Invalidate specific cache entries.",
responses={200: OpenApiTypes.OBJECT},
tags=["Maps"],
),
)
class MapCacheAPIView(APIView):
"""API endpoint for cache management (admin only)."""
permission_classes = [IsAdminUser] # Admin only
def delete(self, request: HttpRequest) -> Response:
"""Clear all map cache (admin only)."""
try:
# Clear all map-related cache keys
# Note: cache.keys() may not be available in all cache backends
try:
cache_keys = cache.keys("map_*")
if cache_keys:
cache.delete_many(cache_keys)
cleared_count = len(cache_keys)
else:
cleared_count = 0
except AttributeError:
# Fallback: clear cache without pattern matching
cache.clear()
cleared_count = 1 # Indicate cache was cleared
return Response(
{
"status": "success",
"detail": f"Map cache cleared successfully. Cleared {cleared_count} entries.",
"cleared_count": cleared_count,
}
)
except Exception as e:
capture_and_log(e, 'Clear map cache', source='api')
return Response(
{"status": "error", "detail": "Failed to clear map cache"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
def post(self, request: HttpRequest) -> Response:
"""Invalidate specific cache entries."""
try:
# Get cache keys to invalidate from request data
request_data = getattr(request, "data", {})
cache_keys = request_data.get("cache_keys", []) if request_data else []
if cache_keys:
cache.delete_many(cache_keys)
invalidated_count = len(cache_keys)
else:
invalidated_count = 0
return Response(
{
"status": "success",
"detail": f"Cache invalidated successfully. Invalidated {invalidated_count} entries.",
"invalidated_count": invalidated_count,
}
)
except Exception as e:
capture_and_log(e, 'Invalidate map cache', source='api')
return Response(
{"status": "error", "detail": "Failed to invalidate cache"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# Legacy compatibility aliases
MapLocationsView = MapLocationsAPIView
MapLocationDetailView = MapLocationDetailAPIView
MapSearchView = MapSearchAPIView
MapBoundsView = MapBoundsAPIView
MapStatsView = MapStatsAPIView
MapCacheView = MapCacheAPIView
# =============================================================================
# Location Detection / Enrichment Endpoints
# =============================================================================
@extend_schema_view(
post=extend_schema(
summary="Detect user location from IP",
description="Detect the user's approximate location based on their IP address.",
request={
"application/json": {
"type": "object",
"properties": {
"ip_address": {
"type": "string",
"description": "IP address to geolocate. If not provided, uses request IP.",
}
},
}
},
responses={
200: {
"type": "object",
"properties": {
"latitude": {"type": "number"},
"longitude": {"type": "number"},
"city": {"type": "string"},
"region": {"type": "string"},
"country": {"type": "string"},
"timezone": {"type": "string"},
},
}
},
tags=["Maps"],
),
)
class DetectLocationView(APIView):
"""
POST /maps/detect-location/
Detect user's location based on IP address using a geolocation service.
"""
permission_classes = [AllowAny]
def post(self, request):
try:
# Get IP address from request or payload
ip_address = request.data.get("ip_address")
if not ip_address:
# Get client IP from request
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip_address = x_forwarded_for.split(",")[0].strip()
else:
ip_address = request.META.get("REMOTE_ADDR", "")
# For localhost/development, return a default location
if ip_address in ("127.0.0.1", "::1", "localhost") or ip_address.startswith("192.168."):
return Response(
{
"latitude": 40.7128,
"longitude": -74.006,
"city": "New York",
"region": "New York",
"country": "US",
"country_name": "United States",
"timezone": "America/New_York",
"detected": False,
"reason": "localhost_fallback",
}
)
# Use IP geolocation service (ipapi.co, ipinfo.io, etc.)
import httpx
try:
response = httpx.get(
f"https://ipapi.co/{ip_address}/json/",
timeout=5.0,
headers={"User-Agent": "ThrillWiki/1.0"},
)
if response.status_code == 200:
data = response.json()
return Response(
{
"latitude": data.get("latitude"),
"longitude": data.get("longitude"),
"city": data.get("city", ""),
"region": data.get("region", ""),
"country": data.get("country_code", ""),
"country_name": data.get("country_name", ""),
"timezone": data.get("timezone", ""),
"detected": True,
}
)
except httpx.HTTPError as e:
logger.warning(f"IP geolocation failed: {e}")
# Fallback response
return Response(
{
"latitude": None,
"longitude": None,
"city": "",
"region": "",
"country": "",
"country_name": "",
"timezone": "",
"detected": False,
"reason": "geolocation_failed",
}
)
except Exception as e:
capture_and_log(e, "Detect location from IP", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema_view(
post=extend_schema(
summary="Enrich location with geocoding",
description="Enrich location data with reverse geocoding (coordinates to address).",
request={
"application/json": {
"type": "object",
"properties": {
"latitude": {"type": "number", "required": True},
"longitude": {"type": "number", "required": True},
},
}
},
responses={
200: {
"type": "object",
"properties": {
"formatted_address": {"type": "string"},
"street_address": {"type": "string"},
"city": {"type": "string"},
"state": {"type": "string"},
"postal_code": {"type": "string"},
"country": {"type": "string"},
},
}
},
tags=["Maps"],
),
)
class EnrichLocationView(APIView):
"""
POST /maps/enrich-location/
Enrich location with reverse geocoding (coordinates to address).
"""
permission_classes = [AllowAny]
def post(self, request):
try:
latitude = request.data.get("latitude")
longitude = request.data.get("longitude")
if latitude is None or longitude is None:
return Response(
{"detail": "latitude and longitude are required"},
status=status.HTTP_400_BAD_REQUEST,
)
try:
lat = float(latitude)
lng = float(longitude)
except (TypeError, ValueError):
return Response(
{"detail": "Invalid latitude or longitude"},
status=status.HTTP_400_BAD_REQUEST,
)
# Use reverse geocoding service
import httpx
try:
# Using Nominatim (OpenStreetMap) - free, no API key required
response = httpx.get(
"https://nominatim.openstreetmap.org/reverse",
params={
"lat": lat,
"lon": lng,
"format": "json",
"addressdetails": 1,
},
timeout=5.0,
headers={"User-Agent": "ThrillWiki/1.0"},
)
if response.status_code == 200:
data = response.json()
address = data.get("address", {})
return Response(
{
"formatted_address": data.get("display_name", ""),
"street_address": address.get("road", ""),
"house_number": address.get("house_number", ""),
"city": (
address.get("city")
or address.get("town")
or address.get("village")
or ""
),
"state": address.get("state", ""),
"postal_code": address.get("postcode", ""),
"country": address.get("country", ""),
"country_code": address.get("country_code", "").upper(),
"enriched": True,
}
)
except httpx.HTTPError as e:
logger.warning(f"Reverse geocoding failed: {e}")
# Fallback response
return Response(
{
"formatted_address": "",
"street_address": "",
"city": "",
"state": "",
"postal_code": "",
"country": "",
"country_code": "",
"enriched": False,
"reason": "geocoding_failed",
}
)
except Exception as e:
capture_and_log(e, "Enrich location", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema_view(
post=extend_schema(
summary="Search for a location by text",
description="Forward geocoding - convert a text query (address, city name, etc.) to coordinates.",
request={
"application/json": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Location search query (address, city, place name, etc.)",
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return (default: 5)",
},
"country": {
"type": "string",
"description": "ISO 3166-1 alpha-2 country code to restrict search",
},
},
"required": ["query"],
}
},
responses={
200: {
"type": "object",
"properties": {
"results": {
"type": "array",
"items": {
"type": "object",
"properties": {
"latitude": {"type": "number"},
"longitude": {"type": "number"},
"formatted_address": {"type": "string"},
"city": {"type": "string"},
"state": {"type": "string"},
"country": {"type": "string"},
"importance": {"type": "number"},
},
},
},
"query": {"type": "string"},
"count": {"type": "integer"},
},
},
400: {"description": "Missing or invalid query parameter"},
},
tags=["Maps"],
),
)
class SearchLocationView(APIView):
"""
POST /maps/search-location/
Forward geocoding - search for locations by text query.
Full parity with Supabase Edge Function: search-location
Features:
- Query caching with SHA-256 hash (7-day expiration)
- Rate limiting (30 requests per minute per IP)
- Usage logging for monitoring
- Cache headers (X-Cache: HIT/MISS)
"""
permission_classes = [AllowAny]
# Rate limit settings matching original
RATE_LIMIT_REQUESTS = 30
RATE_LIMIT_PERIOD = 60 # 1 minute
CACHE_EXPIRATION = 7 * 24 * 60 * 60 # 7 days in seconds
def _hash_query(self, query: str) -> str:
"""Hash query for cache lookup (matching original SHA-256)."""
import hashlib
normalized = query.strip().lower()
return hashlib.sha256(normalized.encode()).hexdigest()
def _get_client_ip(self, request) -> str:
"""Get client IP from request headers."""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0].strip()
return request.META.get('HTTP_X_REAL_IP') or request.META.get('REMOTE_ADDR') or 'unknown'
def _check_rate_limit(self, client_ip: str) -> tuple[bool, int]:
"""
Check if client is rate limited.
Returns (is_allowed, current_count).
"""
from django.core.cache import cache
rate_limit_key = f"search_location:rate:{client_ip}"
current_count = cache.get(rate_limit_key, 0)
if current_count >= self.RATE_LIMIT_REQUESTS:
return False, current_count
# Increment counter with TTL
cache.set(rate_limit_key, current_count + 1, self.RATE_LIMIT_PERIOD)
return True, current_count + 1
def _get_cached_result(self, query_hash: str):
"""Get cached result if available."""
from django.core.cache import cache
cache_key = f"search_location:query:{query_hash}"
cached_data = cache.get(cache_key)
if cached_data:
# Update access count in a separate key
access_key = f"search_location:access:{query_hash}"
access_count = cache.get(access_key, 0)
cache.set(access_key, access_count + 1, self.CACHE_EXPIRATION)
return cached_data
def _set_cached_result(self, query: str, query_hash: str, results: list):
"""Cache the results."""
from django.core.cache import cache
cache_key = f"search_location:query:{query_hash}"
cache_data = {
"query": query,
"results": results,
"result_count": len(results),
}
cache.set(cache_key, cache_data, self.CACHE_EXPIRATION)
# Initialize access count
access_key = f"search_location:access:{query_hash}"
cache.set(access_key, 1, self.CACHE_EXPIRATION)
def _log_usage(self, query: str, cache_hit: bool, api_called: bool,
response_time_ms: int = None, result_count: int = None,
client_ip: str = None, user_id: str = None,
error: str = None, status_code: int = None):
"""Log API usage for monitoring."""
# Log to structured logger for now (can be enhanced to write to DB)
logger.info(
"OpenStreetMap API usage",
extra={
"query": query[:100],
"cache_hit": cache_hit,
"api_called": api_called,
"response_time_ms": response_time_ms,
"result_count": result_count,
"client_ip": client_ip,
"user_id": user_id,
"error": error,
"status_code": status_code,
}
)
def post(self, request):
import time
import re
start_time = time.time()
client_ip = self._get_client_ip(request)
user_id = None
try:
# Safely get user ID
if request.user and request.user.is_authenticated:
user_id = str(getattr(request.user, 'user_id', request.user.id))
except Exception:
pass
try:
# ================================================================
# STEP 0: Sanitize and validate input
# ================================================================
raw_query = request.data.get("query", "")
if not isinstance(raw_query, str):
raw_query = str(raw_query) if raw_query else ""
# Sanitize query: strip, limit length, remove control characters
query = raw_query.strip()[:500]
query = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', query)
# Validate limit
try:
limit = min(int(request.data.get("limit", 5)), 10)
limit = max(limit, 1) # At least 1
except (ValueError, TypeError):
limit = 5
# Sanitize country code (2-letter ISO code)
raw_country = request.data.get("country", "")
country_code = ""
if raw_country and isinstance(raw_country, str):
country_code = re.sub(r'[^a-zA-Z]', '', raw_country)[:2].lower()
# ================================================================
# STEP 1: Validate query (original: min 3 characters)
# ================================================================
if not query:
response_time = int((time.time() - start_time) * 1000)
self._log_usage(
query="",
cache_hit=False,
api_called=False,
response_time_ms=response_time,
client_ip=client_ip,
user_id=user_id,
error="Query is required",
status_code=400
)
return Response(
{"error": "Query is required"},
status=status.HTTP_400_BAD_REQUEST,
)
if len(query) < 3: # Match original: min 3 characters
response_time = int((time.time() - start_time) * 1000)
self._log_usage(
query=query,
cache_hit=False,
api_called=False,
response_time_ms=response_time,
client_ip=client_ip,
user_id=user_id,
error="Query must be at least 3 characters",
status_code=400
)
return Response(
{"error": "Query must be at least 3 characters"},
status=status.HTTP_400_BAD_REQUEST,
)
# ================================================================
# STEP 2: Check rate limit (30 req/min per IP)
# ================================================================
is_allowed, current_count = self._check_rate_limit(client_ip)
if not is_allowed:
response_time = int((time.time() - start_time) * 1000)
self._log_usage(
query=query,
cache_hit=False,
api_called=False,
response_time_ms=response_time,
client_ip=client_ip,
user_id=user_id,
error="Rate limit exceeded",
status_code=429
)
return Response(
{"error": "Rate limit exceeded. Please try again later."},
status=status.HTTP_429_TOO_MANY_REQUESTS,
headers={
"Retry-After": str(self.RATE_LIMIT_PERIOD),
"X-RateLimit-Limit": str(self.RATE_LIMIT_REQUESTS),
"X-RateLimit-Remaining": "0",
}
)
# ================================================================
# STEP 3: Check cache
# ================================================================
query_hash = self._hash_query(query)
cached = self._get_cached_result(query_hash)
if cached:
response_time = int((time.time() - start_time) * 1000)
results = cached.get("results", [])
self._log_usage(
query=query,
cache_hit=True,
api_called=False,
response_time_ms=response_time,
result_count=len(results),
client_ip=client_ip,
user_id=user_id,
status_code=200
)
# Return raw array like original (frontend handles both formats)
response = Response(
results,
status=status.HTTP_200_OK,
)
response["X-Cache"] = "HIT"
response["Cache-Control"] = "public, max-age=3600"
return response
# ================================================================
# STEP 4: Cache miss - call Nominatim API
# ================================================================
import httpx
try:
params = {
"q": query,
"format": "json",
"addressdetails": 1,
"limit": limit,
}
if country_code:
params["countrycodes"] = country_code.lower()
api_response = httpx.get(
"https://nominatim.openstreetmap.org/search",
params=params,
timeout=10.0,
headers={"User-Agent": "ThrillWiki/1.0 (https://thrillwiki.com)"},
)
if api_response.status_code != 200:
logger.warning(
f"Nominatim API error: {api_response.status_code}",
extra={"status": api_response.status_code}
)
return Response(
{"error": "Location search failed", "status": api_response.status_code},
status=api_response.status_code,
)
data = api_response.json()
response_time = int((time.time() - start_time) * 1000)
# ================================================================
# STEP 5: Cache the results (background-like, but sync in Django)
# ================================================================
try:
self._set_cached_result(query, query_hash, data)
except Exception as cache_error:
logger.warning(f"Failed to cache result: {cache_error}")
# Log usage
self._log_usage(
query=query,
cache_hit=False,
api_called=True,
response_time_ms=response_time,
result_count=len(data) if isinstance(data, list) else 0,
client_ip=client_ip,
user_id=user_id,
status_code=200
)
# Return raw array like original Nominatim response
response = Response(
data,
status=status.HTTP_200_OK,
)
response["X-Cache"] = "MISS"
response["Cache-Control"] = "public, max-age=3600"
return response
except httpx.HTTPError as e:
logger.warning(f"Forward geocoding failed: {e}")
response_time = int((time.time() - start_time) * 1000)
self._log_usage(
query=query,
cache_hit=False,
api_called=True,
response_time_ms=response_time,
client_ip=client_ip,
user_id=user_id,
error=str(e),
status_code=500
)
return Response(
{"error": "Failed to fetch location data"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
except ValueError as e:
return Response(
{"error": f"Invalid parameter: {str(e)}"},
status=status.HTTP_400_BAD_REQUEST,
)
except Exception as e:
capture_and_log(e, "Search location", source="api")
return Response(
{"error": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)