major changes, including tailwind v4

This commit is contained in:
pacnpal
2025-08-15 12:24:20 -04:00
parent f6c8e0e25c
commit da7c7e3381
261 changed files with 22783 additions and 10465 deletions

57
core/analytics.py Normal file
View File

@@ -0,0 +1,57 @@
from django.db import models
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.utils import timezone
from django.db.models import Count
from django.conf import settings
class PageView(models.Model):
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, related_name='page_views')
object_id = models.PositiveIntegerField()
content_object = GenericForeignKey('content_type', 'object_id')
timestamp = models.DateTimeField(auto_now_add=True, db_index=True)
ip_address = models.GenericIPAddressField()
user_agent = models.CharField(max_length=512, blank=True)
class Meta:
indexes = [
models.Index(fields=['timestamp']),
models.Index(fields=['content_type', 'object_id']),
]
@classmethod
def get_trending_items(cls, model_class, hours=24, limit=10):
"""Get trending items of a specific model class based on views in last X hours.
Args:
model_class: The model class to get trending items for (e.g., Park, Ride)
hours (int): Number of hours to look back for views (default: 24)
limit (int): Maximum number of items to return (default: 10)
Returns:
QuerySet: The trending items ordered by view count
"""
content_type = ContentType.objects.get_for_model(model_class)
cutoff = timezone.now() - timezone.timedelta(hours=hours)
# Query through the ContentType relationship
item_ids = cls.objects.filter(
content_type=content_type,
timestamp__gte=cutoff
).values('object_id').annotate(
view_count=Count('id')
).filter(
view_count__gt=0
).order_by('-view_count').values_list('object_id', flat=True)[:limit]
# Get the actual items in the correct order
if item_ids:
# Convert the list to a string of comma-separated values
id_list = list(item_ids)
# Use Case/When to preserve the ordering
from django.db.models import Case, When
preserved = Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(id_list)])
return model_class.objects.filter(pk__in=id_list).order_by(preserved)
return model_class.objects.none()

92
core/history.py Normal file
View File

@@ -0,0 +1,92 @@
from django.db import models
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes.fields import GenericForeignKey
from django.conf import settings
from typing import Any, Dict, Optional
from django.db.models import QuerySet
class DiffMixin:
"""Mixin to add diffing capabilities to models"""
def get_prev_record(self) -> Optional[Any]:
"""Get the previous record for this instance"""
try:
return type(self).objects.filter(
pgh_created_at__lt=self.pgh_created_at,
pgh_obj_id=self.pgh_obj_id
).order_by('-pgh_created_at').first()
except (AttributeError, TypeError):
return None
def diff_against_previous(self) -> Dict:
"""Compare this record against the previous one"""
prev_record = self.get_prev_record()
if not prev_record:
return {}
skip_fields = {
'pgh_id', 'pgh_created_at', 'pgh_label',
'pgh_obj_id', 'pgh_context_id', '_state',
'created_at', 'updated_at'
}
changes = {}
for field, value in self.__dict__.items():
# Skip internal fields and those we don't want to track
if field.startswith('_') or field in skip_fields or field.endswith('_id'):
continue
try:
old_value = getattr(prev_record, field)
new_value = value
if old_value != new_value:
changes[field] = {
"old": str(old_value) if old_value is not None else "None",
"new": str(new_value) if new_value is not None else "None"
}
except AttributeError:
continue
return changes
class TrackedModel(models.Model):
"""Abstract base class for models that need history tracking"""
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
abstract = True
def get_history(self) -> QuerySet:
"""Get all history records for this instance in chronological order"""
event_model = self.events.model # pghistory provides this automatically
if event_model:
return event_model.objects.filter(
pgh_obj_id=self.pk
).order_by('-pgh_created_at')
return self.__class__.objects.none()
class HistoricalSlug(models.Model):
"""Track historical slugs for models"""
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
object_id = models.PositiveIntegerField()
content_object = GenericForeignKey('content_type', 'object_id')
slug = models.SlugField(max_length=255)
created_at = models.DateTimeField(auto_now_add=True)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name='historical_slugs'
)
class Meta:
unique_together = ('content_type', 'slug')
indexes = [
models.Index(fields=['content_type', 'object_id']),
models.Index(fields=['slug']),
]
def __str__(self) -> str:
return f"{self.content_type} - {self.object_id} - {self.slug}"

View File

@@ -0,0 +1,34 @@
from django.core.management.base import BaseCommand
from django.core.cache import cache
from parks.models import Park
from rides.models import Ride
from core.analytics import PageView
class Command(BaseCommand):
help = 'Updates trending parks and rides cache based on views in the last 24 hours'
def handle(self, *args, **kwargs):
"""
Updates the trending parks and rides in the cache.
This command is designed to be run every hour via cron to keep the trending
items up to date. It looks at page views from the last 24 hours and caches
the top 10 most viewed parks and rides.
The cached data is used by the home page to display trending items without
having to query the database on every request.
"""
# Get top 10 trending parks and rides from the last 24 hours
trending_parks = PageView.get_trending_items(Park, hours=24, limit=10)
trending_rides = PageView.get_trending_items(Ride, hours=24, limit=10)
# Cache the results for 1 hour
cache.set('trending_parks', trending_parks, 3600) # 3600 seconds = 1 hour
cache.set('trending_rides', trending_rides, 3600)
self.stdout.write(
self.style.SUCCESS(
'Successfully updated trending parks and rides. '
'Cached 10 items each for parks and rides based on views in the last 24 hours.'
)
)

View File

@@ -1,6 +1,10 @@
import pghistory
from django.contrib.auth.models import AnonymousUser
from django.core.handlers.wsgi import WSGIRequest
from django.utils.deprecation import MiddlewareMixin
from django.contrib.contenttypes.models import ContentType
from django.views.generic.detail import DetailView
from core.analytics import PageView
class RequestContextProvider(pghistory.context):
"""Custom context provider for pghistory that extracts information from the request."""
@@ -24,4 +28,39 @@ class PgHistoryContextMiddleware:
def __call__(self, request):
response = self.get_response(request)
return response
return response
class PageViewMiddleware(MiddlewareMixin):
def process_view(self, request, view_func, view_args, view_kwargs):
# Only track GET requests
if request.method != 'GET':
return None
# Get view class if it exists
view_class = getattr(view_func, 'view_class', None)
if not view_class or not issubclass(view_class, DetailView):
return None
# Get the object if it's a detail view
try:
view_instance = view_class()
view_instance.request = request
view_instance.args = view_args
view_instance.kwargs = view_kwargs
obj = view_instance.get_object()
except (AttributeError, Exception):
return None
# Record the page view
try:
PageView.objects.create(
content_type=ContentType.objects.get_for_model(obj.__class__),
object_id=obj.pk,
ip_address=request.META.get('REMOTE_ADDR', ''),
user_agent=request.META.get('HTTP_USER_AGENT', '')[:512]
)
except Exception:
# Fail silently to not interrupt the request
pass
return None

View File

@@ -1,4 +1,4 @@
# Generated by Django 5.1.4 on 2025-02-10 01:10
# Generated by Django 5.1.4 on 2025-08-13 21:35
import django.db.models.deletion
from django.db import migrations, models

View File

@@ -0,0 +1,98 @@
# Generated by Django 5.1.4 on 2025-08-14 14:50
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("contenttypes", "0002_remove_content_type_name"),
("core", "0001_initial"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="HistoricalSlug",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("object_id", models.PositiveIntegerField()),
("slug", models.SlugField(max_length=255)),
("created_at", models.DateTimeField(auto_now_add=True)),
(
"content_type",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="contenttypes.contenttype",
),
),
(
"user",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="historical_slugs",
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"indexes": [
models.Index(
fields=["content_type", "object_id"],
name="core_histor_content_b4c470_idx",
),
models.Index(fields=["slug"], name="core_histor_slug_8fd7b3_idx"),
],
"unique_together": {("content_type", "slug")},
},
),
migrations.CreateModel(
name="PageView",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("object_id", models.PositiveIntegerField()),
("timestamp", models.DateTimeField(auto_now_add=True, db_index=True)),
("ip_address", models.GenericIPAddressField()),
("user_agent", models.CharField(blank=True, max_length=512)),
(
"content_type",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="page_views",
to="contenttypes.contenttype",
),
),
],
options={
"indexes": [
models.Index(
fields=["timestamp"], name="core_pagevi_timesta_757ebb_idx"
),
models.Index(
fields=["content_type", "object_id"],
name="core_pagevi_content_eda7ad_idx",
),
],
},
),
]

17
core/mixins/__init__.py Normal file
View File

@@ -0,0 +1,17 @@
from django.views.generic.list import MultipleObjectMixin
class HTMXFilterableMixin(MultipleObjectMixin):
"""
A mixin that provides filtering capabilities for HTMX requests.
"""
filter_class = None
def get_queryset(self):
queryset = super().get_queryset()
self.filterset = self.filter_class(self.request.GET, queryset=queryset)
return self.filterset.qs
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['filter'] = self.filterset
return context

View File

@@ -2,7 +2,7 @@ from django.db import models
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.utils.text import slugify
from history_tracking.models import TrackedModel
from core.history import TrackedModel
class SlugHistory(models.Model):
"""

27
core/services/__init__.py Normal file
View File

@@ -0,0 +1,27 @@
"""
Core services for ThrillWiki unified map functionality.
"""
from .map_service import UnifiedMapService
from .clustering_service import ClusteringService
from .map_cache_service import MapCacheService
from .data_structures import (
UnifiedLocation,
LocationType,
GeoBounds,
MapFilters,
MapResponse,
ClusterData
)
__all__ = [
'UnifiedMapService',
'ClusteringService',
'MapCacheService',
'UnifiedLocation',
'LocationType',
'GeoBounds',
'MapFilters',
'MapResponse',
'ClusterData'
]

View File

@@ -0,0 +1,342 @@
"""
Clustering service for map locations to improve performance and user experience.
"""
import math
from typing import List, Tuple, Dict, Any, Optional, Set
from dataclasses import dataclass
from collections import defaultdict
from .data_structures import (
UnifiedLocation,
ClusterData,
GeoBounds,
LocationType
)
@dataclass
class ClusterPoint:
"""Internal representation of a point for clustering."""
location: UnifiedLocation
x: float # Projected x coordinate
y: float # Projected y coordinate
class ClusteringService:
"""
Handles location clustering for map display using a simple grid-based approach
with zoom-level dependent clustering radius.
"""
# Clustering configuration
DEFAULT_RADIUS = 40 # pixels
MIN_POINTS_TO_CLUSTER = 2
MAX_ZOOM_FOR_CLUSTERING = 15
MIN_ZOOM_FOR_CLUSTERING = 3
# Zoom level configurations
ZOOM_CONFIGS = {
3: {'radius': 80, 'min_points': 5}, # World level
4: {'radius': 70, 'min_points': 4}, # Continent level
5: {'radius': 60, 'min_points': 3}, # Country level
6: {'radius': 50, 'min_points': 3}, # Large region level
7: {'radius': 45, 'min_points': 2}, # Region level
8: {'radius': 40, 'min_points': 2}, # State level
9: {'radius': 35, 'min_points': 2}, # Metro area level
10: {'radius': 30, 'min_points': 2}, # City level
11: {'radius': 25, 'min_points': 2}, # District level
12: {'radius': 20, 'min_points': 2}, # Neighborhood level
13: {'radius': 15, 'min_points': 2}, # Block level
14: {'radius': 10, 'min_points': 2}, # Street level
15: {'radius': 5, 'min_points': 2}, # Building level
}
def __init__(self):
self.cluster_id_counter = 0
def should_cluster(self, zoom_level: int, point_count: int) -> bool:
"""Determine if clustering should be applied based on zoom level and point count."""
if zoom_level > self.MAX_ZOOM_FOR_CLUSTERING:
return False
if zoom_level < self.MIN_ZOOM_FOR_CLUSTERING:
return True
config = self.ZOOM_CONFIGS.get(zoom_level, {'min_points': self.MIN_POINTS_TO_CLUSTER})
return point_count >= config['min_points']
def cluster_locations(
self,
locations: List[UnifiedLocation],
zoom_level: int,
bounds: Optional[GeoBounds] = None
) -> Tuple[List[UnifiedLocation], List[ClusterData]]:
"""
Cluster locations based on zoom level and density.
Returns (unclustered_locations, clusters).
"""
if not locations or not self.should_cluster(zoom_level, len(locations)):
return locations, []
# Convert locations to projected coordinates for clustering
cluster_points = self._project_locations(locations, bounds)
# Get clustering configuration for zoom level
config = self.ZOOM_CONFIGS.get(zoom_level, {
'radius': self.DEFAULT_RADIUS,
'min_points': self.MIN_POINTS_TO_CLUSTER
})
# Perform clustering
clustered_groups = self._cluster_points(cluster_points, config['radius'], config['min_points'])
# Separate individual locations from clusters
unclustered_locations = []
clusters = []
for group in clustered_groups:
if len(group) < config['min_points']:
# Add individual locations
unclustered_locations.extend([cp.location for cp in group])
else:
# Create cluster
cluster = self._create_cluster(group)
clusters.append(cluster)
return unclustered_locations, clusters
def _project_locations(
self,
locations: List[UnifiedLocation],
bounds: Optional[GeoBounds] = None
) -> List[ClusterPoint]:
"""Convert lat/lng coordinates to projected x/y for clustering calculations."""
cluster_points = []
# Use bounds or calculate from locations
if not bounds:
lats = [loc.latitude for loc in locations]
lngs = [loc.longitude for loc in locations]
bounds = GeoBounds(
north=max(lats),
south=min(lats),
east=max(lngs),
west=min(lngs)
)
# Simple equirectangular projection (good enough for clustering)
center_lat = (bounds.north + bounds.south) / 2
lat_scale = 111320 # meters per degree latitude
lng_scale = 111320 * math.cos(math.radians(center_lat)) # meters per degree longitude
for location in locations:
# Convert to meters relative to bounds center
x = (location.longitude - (bounds.west + bounds.east) / 2) * lng_scale
y = (location.latitude - (bounds.north + bounds.south) / 2) * lat_scale
cluster_points.append(ClusterPoint(
location=location,
x=x,
y=y
))
return cluster_points
def _cluster_points(
self,
points: List[ClusterPoint],
radius_pixels: int,
min_points: int
) -> List[List[ClusterPoint]]:
"""
Cluster points using a simple distance-based approach.
Radius is in pixels, converted to meters based on zoom level.
"""
# Convert pixel radius to meters (rough approximation)
# At zoom level 10, 1 pixel ≈ 150 meters
radius_meters = radius_pixels * 150
clustered = [False] * len(points)
clusters = []
for i, point in enumerate(points):
if clustered[i]:
continue
# Find all points within radius
cluster_group = [point]
clustered[i] = True
for j, other_point in enumerate(points):
if i == j or clustered[j]:
continue
distance = self._calculate_distance(point, other_point)
if distance <= radius_meters:
cluster_group.append(other_point)
clustered[j] = True
clusters.append(cluster_group)
return clusters
def _calculate_distance(self, point1: ClusterPoint, point2: ClusterPoint) -> float:
"""Calculate Euclidean distance between two projected points in meters."""
dx = point1.x - point2.x
dy = point1.y - point2.y
return math.sqrt(dx * dx + dy * dy)
def _create_cluster(self, cluster_points: List[ClusterPoint]) -> ClusterData:
"""Create a ClusterData object from a group of points."""
locations = [cp.location for cp in cluster_points]
# Calculate cluster center (average position)
avg_lat = sum(loc.latitude for loc in locations) / len(locations)
avg_lng = sum(loc.longitude for loc in locations) / len(locations)
# Calculate cluster bounds
lats = [loc.latitude for loc in locations]
lngs = [loc.longitude for loc in locations]
cluster_bounds = GeoBounds(
north=max(lats),
south=min(lats),
east=max(lngs),
west=min(lngs)
)
# Collect location types in cluster
types = set(loc.type for loc in locations)
# Select representative location (highest weight)
representative = self._select_representative_location(locations)
# Generate cluster ID
self.cluster_id_counter += 1
cluster_id = f"cluster_{self.cluster_id_counter}"
return ClusterData(
id=cluster_id,
coordinates=(avg_lat, avg_lng),
count=len(locations),
types=types,
bounds=cluster_bounds,
representative_location=representative
)
def _select_representative_location(self, locations: List[UnifiedLocation]) -> Optional[UnifiedLocation]:
"""Select the most representative location for a cluster."""
if not locations:
return None
# Prioritize by: 1) Parks over rides/companies, 2) Higher weight, 3) Better rating
parks = [loc for loc in locations if loc.type == LocationType.PARK]
if parks:
return max(parks, key=lambda x: (
x.cluster_weight,
x.metadata.get('rating', 0) or 0
))
rides = [loc for loc in locations if loc.type == LocationType.RIDE]
if rides:
return max(rides, key=lambda x: (
x.cluster_weight,
x.metadata.get('rating', 0) or 0
))
companies = [loc for loc in locations if loc.type == LocationType.COMPANY]
if companies:
return max(companies, key=lambda x: x.cluster_weight)
# Fall back to highest weight location
return max(locations, key=lambda x: x.cluster_weight)
def get_cluster_breakdown(self, clusters: List[ClusterData]) -> Dict[str, Any]:
"""Get statistics about clustering results."""
if not clusters:
return {
'total_clusters': 0,
'total_points_clustered': 0,
'average_cluster_size': 0,
'type_distribution': {},
'category_distribution': {}
}
total_points = sum(cluster.count for cluster in clusters)
type_counts = defaultdict(int)
category_counts = defaultdict(int)
for cluster in clusters:
for location_type in cluster.types:
type_counts[location_type.value] += cluster.count
if cluster.representative_location:
category_counts[cluster.representative_location.cluster_category] += 1
return {
'total_clusters': len(clusters),
'total_points_clustered': total_points,
'average_cluster_size': total_points / len(clusters),
'largest_cluster_size': max(cluster.count for cluster in clusters),
'smallest_cluster_size': min(cluster.count for cluster in clusters),
'type_distribution': dict(type_counts),
'category_distribution': dict(category_counts)
}
def expand_cluster(self, cluster: ClusterData, zoom_level: int) -> List[UnifiedLocation]:
"""
Expand a cluster to show individual locations (for drill-down functionality).
This would typically require re-querying the database with the cluster bounds.
"""
# This is a placeholder - in practice, this would re-query the database
# with the cluster bounds and higher detail level
return []
class SmartClusteringRules:
"""
Advanced clustering rules that consider location types and importance.
"""
@staticmethod
def should_cluster_together(loc1: UnifiedLocation, loc2: UnifiedLocation) -> bool:
"""Determine if two locations should be clustered together."""
# Same park rides should cluster together more readily
if loc1.type == LocationType.RIDE and loc2.type == LocationType.RIDE:
park1_id = loc1.metadata.get('park_id')
park2_id = loc2.metadata.get('park_id')
if park1_id and park2_id and park1_id == park2_id:
return True
# Major parks should resist clustering unless very close
if (loc1.cluster_category == "major_park" or loc2.cluster_category == "major_park"):
return False
# Similar types cluster more readily
if loc1.type == loc2.type:
return True
# Different types can cluster but with higher threshold
return False
@staticmethod
def calculate_cluster_priority(locations: List[UnifiedLocation]) -> UnifiedLocation:
"""Select the representative location for a cluster based on priority rules."""
# Prioritize by: 1) Parks over rides, 2) Higher weight, 3) Better rating
parks = [loc for loc in locations if loc.type == LocationType.PARK]
if parks:
return max(parks, key=lambda x: (
x.cluster_weight,
x.metadata.get('rating', 0) or 0,
x.metadata.get('ride_count', 0) or 0
))
rides = [loc for loc in locations if loc.type == LocationType.RIDE]
if rides:
return max(rides, key=lambda x: (
x.cluster_weight,
x.metadata.get('rating', 0) or 0
))
# Fall back to highest weight
return max(locations, key=lambda x: x.cluster_weight)

View File

@@ -0,0 +1,240 @@
"""
Data structures for the unified map service.
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Set, Tuple, Any
from django.contrib.gis.geos import Polygon, Point
class LocationType(Enum):
"""Types of locations supported by the map service."""
PARK = "park"
RIDE = "ride"
COMPANY = "company"
GENERIC = "generic"
@dataclass
class GeoBounds:
"""Geographic boundary box for spatial queries."""
north: float
south: float
east: float
west: float
def __post_init__(self):
"""Validate bounds after initialization."""
if self.north < self.south:
raise ValueError("North bound must be greater than south bound")
if self.east < self.west:
raise ValueError("East bound must be greater than west bound")
if not (-90 <= self.south <= 90 and -90 <= self.north <= 90):
raise ValueError("Latitude bounds must be between -90 and 90")
if not (-180 <= self.west <= 180 and -180 <= self.east <= 180):
raise ValueError("Longitude bounds must be between -180 and 180")
def to_polygon(self) -> Polygon:
"""Convert bounds to PostGIS Polygon for database queries."""
return Polygon.from_bbox((self.west, self.south, self.east, self.north))
def expand(self, factor: float = 1.1) -> 'GeoBounds':
"""Expand bounds by factor for buffer queries."""
center_lat = (self.north + self.south) / 2
center_lng = (self.east + self.west) / 2
lat_range = (self.north - self.south) * factor / 2
lng_range = (self.east - self.west) * factor / 2
return GeoBounds(
north=min(90, center_lat + lat_range),
south=max(-90, center_lat - lat_range),
east=min(180, center_lng + lng_range),
west=max(-180, center_lng - lng_range)
)
def contains_point(self, lat: float, lng: float) -> bool:
"""Check if a point is within these bounds."""
return (self.south <= lat <= self.north and
self.west <= lng <= self.east)
def to_dict(self) -> Dict[str, float]:
"""Convert to dictionary for JSON serialization."""
return {
'north': self.north,
'south': self.south,
'east': self.east,
'west': self.west
}
@dataclass
class MapFilters:
"""Filtering options for map queries."""
location_types: Optional[Set[LocationType]] = None
park_status: Optional[Set[str]] = None # OPERATING, CLOSED_TEMP, etc.
ride_types: Optional[Set[str]] = None
company_roles: Optional[Set[str]] = None # OPERATOR, MANUFACTURER, etc.
search_query: Optional[str] = None
min_rating: Optional[float] = None
has_coordinates: bool = True
country: Optional[str] = None
state: Optional[str] = None
city: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for caching and serialization."""
return {
'location_types': [t.value for t in self.location_types] if self.location_types else None,
'park_status': list(self.park_status) if self.park_status else None,
'ride_types': list(self.ride_types) if self.ride_types else None,
'company_roles': list(self.company_roles) if self.company_roles else None,
'search_query': self.search_query,
'min_rating': self.min_rating,
'has_coordinates': self.has_coordinates,
'country': self.country,
'state': self.state,
'city': self.city,
}
@dataclass
class UnifiedLocation:
"""Unified location interface for all location types."""
id: str # Composite: f"{type}_{id}"
type: LocationType
name: str
coordinates: Tuple[float, float] # (lat, lng)
address: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
type_data: Dict[str, Any] = field(default_factory=dict)
cluster_weight: int = 1
cluster_category: str = "default"
@property
def latitude(self) -> float:
"""Get latitude from coordinates."""
return self.coordinates[0]
@property
def longitude(self) -> float:
"""Get longitude from coordinates."""
return self.coordinates[1]
def to_geojson_feature(self) -> Dict[str, Any]:
"""Convert to GeoJSON feature for mapping libraries."""
return {
'type': 'Feature',
'properties': {
'id': self.id,
'type': self.type.value,
'name': self.name,
'address': self.address,
'metadata': self.metadata,
'type_data': self.type_data,
'cluster_weight': self.cluster_weight,
'cluster_category': self.cluster_category
},
'geometry': {
'type': 'Point',
'coordinates': [self.longitude, self.latitude] # GeoJSON uses lng, lat
}
}
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON responses."""
return {
'id': self.id,
'type': self.type.value,
'name': self.name,
'coordinates': list(self.coordinates),
'address': self.address,
'metadata': self.metadata,
'type_data': self.type_data,
'cluster_weight': self.cluster_weight,
'cluster_category': self.cluster_category
}
@dataclass
class ClusterData:
"""Represents a cluster of locations for map display."""
id: str
coordinates: Tuple[float, float] # (lat, lng)
count: int
types: Set[LocationType]
bounds: GeoBounds
representative_location: Optional[UnifiedLocation] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON responses."""
return {
'id': self.id,
'coordinates': list(self.coordinates),
'count': self.count,
'types': [t.value for t in self.types],
'bounds': self.bounds.to_dict(),
'representative': self.representative_location.to_dict() if self.representative_location else None
}
@dataclass
class MapResponse:
"""Response structure for map API calls."""
locations: List[UnifiedLocation] = field(default_factory=list)
clusters: List[ClusterData] = field(default_factory=list)
bounds: Optional[GeoBounds] = None
total_count: int = 0
filtered_count: int = 0
zoom_level: Optional[int] = None
clustered: bool = False
cache_hit: bool = False
query_time_ms: Optional[int] = None
filters_applied: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON responses."""
return {
'status': 'success',
'data': {
'locations': [loc.to_dict() for loc in self.locations],
'clusters': [cluster.to_dict() for cluster in self.clusters],
'bounds': self.bounds.to_dict() if self.bounds else None,
'total_count': self.total_count,
'filtered_count': self.filtered_count,
'zoom_level': self.zoom_level,
'clustered': self.clustered
},
'meta': {
'cache_hit': self.cache_hit,
'query_time_ms': self.query_time_ms,
'filters_applied': self.filters_applied,
'pagination': {
'has_more': False, # TODO: Implement pagination
'total_pages': 1
}
}
}
@dataclass
class QueryPerformanceMetrics:
"""Performance metrics for query optimization."""
query_time_ms: int
db_query_count: int
cache_hit: bool
result_count: int
bounds_used: bool
clustering_used: bool
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for logging."""
return {
'query_time_ms': self.query_time_ms,
'db_query_count': self.db_query_count,
'cache_hit': self.cache_hit,
'result_count': self.result_count,
'bounds_used': self.bounds_used,
'clustering_used': self.clustering_used
}

View File

@@ -0,0 +1,380 @@
"""
Location adapters for converting between domain-specific models and UnifiedLocation.
"""
from typing import List, Optional, Dict, Any
from django.db.models import QuerySet
from django.urls import reverse
from .data_structures import UnifiedLocation, LocationType, GeoBounds, MapFilters
from parks.models.location import ParkLocation
from rides.models.location import RideLocation
from parks.models.companies import CompanyHeadquarters
from location.models import Location
class BaseLocationAdapter:
"""Base adapter class for location conversions."""
def to_unified_location(self, location_obj) -> Optional[UnifiedLocation]:
"""Convert model instance to UnifiedLocation."""
raise NotImplementedError
def get_queryset(self, bounds: Optional[GeoBounds] = None,
filters: Optional[MapFilters] = None) -> QuerySet:
"""Get optimized queryset for this location type."""
raise NotImplementedError
def bulk_convert(self, queryset: QuerySet) -> List[UnifiedLocation]:
"""Convert multiple location objects efficiently."""
unified_locations = []
for obj in queryset:
unified_loc = self.to_unified_location(obj)
if unified_loc:
unified_locations.append(unified_loc)
return unified_locations
class ParkLocationAdapter(BaseLocationAdapter):
"""Converts Park/ParkLocation to UnifiedLocation."""
def to_unified_location(self, park_location: ParkLocation) -> Optional[UnifiedLocation]:
"""Convert ParkLocation to UnifiedLocation."""
if not park_location.point:
return None
park = park_location.park
return UnifiedLocation(
id=f"park_{park.id}",
type=LocationType.PARK,
name=park.name,
coordinates=(park_location.latitude, park_location.longitude),
address=park_location.formatted_address,
metadata={
'status': getattr(park, 'status', 'UNKNOWN'),
'rating': float(park.average_rating) if hasattr(park, 'average_rating') and park.average_rating else None,
'ride_count': getattr(park, 'ride_count', 0),
'coaster_count': getattr(park, 'coaster_count', 0),
'operator': park.operator.name if hasattr(park, 'operator') and park.operator else None,
'city': park_location.city,
'state': park_location.state,
'country': park_location.country,
},
type_data={
'slug': park.slug,
'opening_date': park.opening_date.isoformat() if hasattr(park, 'opening_date') and park.opening_date else None,
'website': getattr(park, 'website', ''),
'operating_season': getattr(park, 'operating_season', ''),
'highway_exit': park_location.highway_exit,
'parking_notes': park_location.parking_notes,
'best_arrival_time': park_location.best_arrival_time.strftime('%H:%M') if park_location.best_arrival_time else None,
'seasonal_notes': park_location.seasonal_notes,
'url': self._get_park_url(park),
},
cluster_weight=self._calculate_park_weight(park),
cluster_category=self._get_park_category(park)
)
def get_queryset(self, bounds: Optional[GeoBounds] = None,
filters: Optional[MapFilters] = None) -> QuerySet:
"""Get optimized queryset for park locations."""
queryset = ParkLocation.objects.select_related(
'park', 'park__operator'
).filter(point__isnull=False)
# Spatial filtering
if bounds:
queryset = queryset.filter(point__within=bounds.to_polygon())
# Park-specific filters
if filters:
if filters.park_status:
queryset = queryset.filter(park__status__in=filters.park_status)
if filters.search_query:
queryset = queryset.filter(park__name__icontains=filters.search_query)
if filters.country:
queryset = queryset.filter(country=filters.country)
if filters.state:
queryset = queryset.filter(state=filters.state)
if filters.city:
queryset = queryset.filter(city=filters.city)
return queryset.order_by('park__name')
def _calculate_park_weight(self, park) -> int:
"""Calculate clustering weight based on park importance."""
weight = 1
if hasattr(park, 'ride_count') and park.ride_count and park.ride_count > 20:
weight += 2
if hasattr(park, 'coaster_count') and park.coaster_count and park.coaster_count > 5:
weight += 1
if hasattr(park, 'average_rating') and park.average_rating and park.average_rating > 4.0:
weight += 1
return min(weight, 5) # Cap at 5
def _get_park_category(self, park) -> str:
"""Determine park category for clustering."""
coaster_count = getattr(park, 'coaster_count', 0) or 0
ride_count = getattr(park, 'ride_count', 0) or 0
if coaster_count >= 10:
return "major_park"
elif ride_count >= 15:
return "theme_park"
else:
return "small_park"
def _get_park_url(self, park) -> str:
"""Get URL for park detail page."""
try:
return reverse('parks:detail', kwargs={'slug': park.slug})
except:
return f"/parks/{park.slug}/"
class RideLocationAdapter(BaseLocationAdapter):
"""Converts Ride/RideLocation to UnifiedLocation."""
def to_unified_location(self, ride_location: RideLocation) -> Optional[UnifiedLocation]:
"""Convert RideLocation to UnifiedLocation."""
if not ride_location.point:
return None
ride = ride_location.ride
return UnifiedLocation(
id=f"ride_{ride.id}",
type=LocationType.RIDE,
name=ride.name,
coordinates=(ride_location.latitude, ride_location.longitude),
address=f"{ride_location.park_area}, {ride.park.name}" if ride_location.park_area else ride.park.name,
metadata={
'park_id': ride.park.id,
'park_name': ride.park.name,
'park_area': ride_location.park_area,
'ride_type': getattr(ride, 'ride_type', 'Unknown'),
'status': getattr(ride, 'status', 'UNKNOWN'),
'rating': float(ride.average_rating) if hasattr(ride, 'average_rating') and ride.average_rating else None,
'manufacturer': getattr(ride, 'manufacturer', {}).get('name') if hasattr(ride, 'manufacturer') else None,
},
type_data={
'slug': ride.slug,
'opening_date': ride.opening_date.isoformat() if hasattr(ride, 'opening_date') and ride.opening_date else None,
'height_requirement': getattr(ride, 'height_requirement', ''),
'duration_minutes': getattr(ride, 'duration_minutes', None),
'max_speed_mph': getattr(ride, 'max_speed_mph', None),
'entrance_notes': ride_location.entrance_notes,
'accessibility_notes': ride_location.accessibility_notes,
'url': self._get_ride_url(ride),
},
cluster_weight=self._calculate_ride_weight(ride),
cluster_category=self._get_ride_category(ride)
)
def get_queryset(self, bounds: Optional[GeoBounds] = None,
filters: Optional[MapFilters] = None) -> QuerySet:
"""Get optimized queryset for ride locations."""
queryset = RideLocation.objects.select_related(
'ride', 'ride__park', 'ride__park__operator'
).filter(point__isnull=False)
# Spatial filtering
if bounds:
queryset = queryset.filter(point__within=bounds.to_polygon())
# Ride-specific filters
if filters:
if filters.ride_types:
queryset = queryset.filter(ride__ride_type__in=filters.ride_types)
if filters.search_query:
queryset = queryset.filter(ride__name__icontains=filters.search_query)
return queryset.order_by('ride__name')
def _calculate_ride_weight(self, ride) -> int:
"""Calculate clustering weight based on ride importance."""
weight = 1
ride_type = getattr(ride, 'ride_type', '').lower()
if 'coaster' in ride_type or 'roller' in ride_type:
weight += 1
if hasattr(ride, 'average_rating') and ride.average_rating and ride.average_rating > 4.0:
weight += 1
return min(weight, 3) # Cap at 3 for rides
def _get_ride_category(self, ride) -> str:
"""Determine ride category for clustering."""
ride_type = getattr(ride, 'ride_type', '').lower()
if 'coaster' in ride_type or 'roller' in ride_type:
return "coaster"
elif 'water' in ride_type or 'splash' in ride_type:
return "water_ride"
else:
return "other_ride"
def _get_ride_url(self, ride) -> str:
"""Get URL for ride detail page."""
try:
return reverse('rides:detail', kwargs={'slug': ride.slug})
except:
return f"/rides/{ride.slug}/"
class CompanyLocationAdapter(BaseLocationAdapter):
"""Converts Company/CompanyHeadquarters to UnifiedLocation."""
def to_unified_location(self, company_headquarters: CompanyHeadquarters) -> Optional[UnifiedLocation]:
"""Convert CompanyHeadquarters to UnifiedLocation."""
# Note: CompanyHeadquarters doesn't have coordinates, so we need to geocode
# For now, we'll skip companies without coordinates
# TODO: Implement geocoding service integration
return None
def get_queryset(self, bounds: Optional[GeoBounds] = None,
filters: Optional[MapFilters] = None) -> QuerySet:
"""Get optimized queryset for company locations."""
queryset = CompanyHeadquarters.objects.select_related('company')
# Company-specific filters
if filters:
if filters.company_roles:
queryset = queryset.filter(company__roles__overlap=filters.company_roles)
if filters.search_query:
queryset = queryset.filter(company__name__icontains=filters.search_query)
if filters.country:
queryset = queryset.filter(country=filters.country)
if filters.city:
queryset = queryset.filter(city=filters.city)
return queryset.order_by('company__name')
class GenericLocationAdapter(BaseLocationAdapter):
"""Converts generic Location model to UnifiedLocation."""
def to_unified_location(self, location: Location) -> Optional[UnifiedLocation]:
"""Convert generic Location to UnifiedLocation."""
if not location.point and not (location.latitude and location.longitude):
return None
# Use point coordinates if available, fall back to lat/lng fields
if location.point:
coordinates = (location.point.y, location.point.x)
else:
coordinates = (float(location.latitude), float(location.longitude))
return UnifiedLocation(
id=f"generic_{location.id}",
type=LocationType.GENERIC,
name=location.name,
coordinates=coordinates,
address=location.get_formatted_address(),
metadata={
'location_type': location.location_type,
'content_type': location.content_type.model if location.content_type else None,
'object_id': location.object_id,
'city': location.city,
'state': location.state,
'country': location.country,
},
type_data={
'created_at': location.created_at.isoformat() if location.created_at else None,
'updated_at': location.updated_at.isoformat() if location.updated_at else None,
},
cluster_weight=1,
cluster_category="generic"
)
def get_queryset(self, bounds: Optional[GeoBounds] = None,
filters: Optional[MapFilters] = None) -> QuerySet:
"""Get optimized queryset for generic locations."""
queryset = Location.objects.select_related('content_type').filter(
models.Q(point__isnull=False) |
models.Q(latitude__isnull=False, longitude__isnull=False)
)
# Spatial filtering
if bounds:
queryset = queryset.filter(
models.Q(point__within=bounds.to_polygon()) |
models.Q(
latitude__gte=bounds.south,
latitude__lte=bounds.north,
longitude__gte=bounds.west,
longitude__lte=bounds.east
)
)
# Generic filters
if filters:
if filters.search_query:
queryset = queryset.filter(name__icontains=filters.search_query)
if filters.country:
queryset = queryset.filter(country=filters.country)
if filters.city:
queryset = queryset.filter(city=filters.city)
return queryset.order_by('name')
class LocationAbstractionLayer:
"""
Abstraction layer handling different location model types.
Implements the adapter pattern to provide unified access to all location types.
"""
def __init__(self):
self.adapters = {
LocationType.PARK: ParkLocationAdapter(),
LocationType.RIDE: RideLocationAdapter(),
LocationType.COMPANY: CompanyLocationAdapter(),
LocationType.GENERIC: GenericLocationAdapter()
}
def get_all_locations(self, bounds: Optional[GeoBounds] = None,
filters: Optional[MapFilters] = None) -> List[UnifiedLocation]:
"""Get locations from all sources within bounds."""
all_locations = []
# Determine which location types to include
location_types = filters.location_types if filters and filters.location_types else set(LocationType)
for location_type in location_types:
adapter = self.adapters[location_type]
queryset = adapter.get_queryset(bounds, filters)
locations = adapter.bulk_convert(queryset)
all_locations.extend(locations)
return all_locations
def get_locations_by_type(self, location_type: LocationType,
bounds: Optional[GeoBounds] = None,
filters: Optional[MapFilters] = None) -> List[UnifiedLocation]:
"""Get locations of specific type."""
adapter = self.adapters[location_type]
queryset = adapter.get_queryset(bounds, filters)
return adapter.bulk_convert(queryset)
def get_location_by_id(self, location_type: LocationType, location_id: int) -> Optional[UnifiedLocation]:
"""Get single location with full details."""
adapter = self.adapters[location_type]
try:
if location_type == LocationType.PARK:
obj = ParkLocation.objects.select_related('park', 'park__operator').get(park_id=location_id)
elif location_type == LocationType.RIDE:
obj = RideLocation.objects.select_related('ride', 'ride__park').get(ride_id=location_id)
elif location_type == LocationType.COMPANY:
obj = CompanyHeadquarters.objects.select_related('company').get(company_id=location_id)
elif location_type == LocationType.GENERIC:
obj = Location.objects.select_related('content_type').get(id=location_id)
else:
return None
return adapter.to_unified_location(obj)
except Exception:
return None
# Import models after defining adapters to avoid circular imports
from django.db import models

View File

@@ -0,0 +1,401 @@
"""
Caching service for map data to improve performance and reduce database load.
"""
import hashlib
import json
import time
from typing import Dict, List, Optional, Any, Union
from dataclasses import asdict
from django.core.cache import cache
from django.conf import settings
from django.utils import timezone
from .data_structures import (
UnifiedLocation,
ClusterData,
GeoBounds,
MapFilters,
MapResponse,
QueryPerformanceMetrics
)
class MapCacheService:
"""
Handles caching of map data with geographic partitioning and intelligent invalidation.
"""
# Cache configuration
DEFAULT_TTL = 3600 # 1 hour
CLUSTER_TTL = 7200 # 2 hours (clusters change less frequently)
LOCATION_DETAIL_TTL = 1800 # 30 minutes
BOUNDS_CACHE_TTL = 1800 # 30 minutes
# Cache key prefixes
CACHE_PREFIX = "thrillwiki_map"
LOCATIONS_PREFIX = f"{CACHE_PREFIX}:locations"
CLUSTERS_PREFIX = f"{CACHE_PREFIX}:clusters"
BOUNDS_PREFIX = f"{CACHE_PREFIX}:bounds"
DETAIL_PREFIX = f"{CACHE_PREFIX}:detail"
STATS_PREFIX = f"{CACHE_PREFIX}:stats"
# Geographic partitioning settings
GEOHASH_PRECISION = 6 # ~1.2km precision for cache partitioning
def __init__(self):
self.cache_stats = {
'hits': 0,
'misses': 0,
'invalidations': 0,
'geohash_partitions': 0
}
def get_locations_cache_key(self, bounds: Optional[GeoBounds],
filters: Optional[MapFilters],
zoom_level: Optional[int] = None) -> str:
"""Generate cache key for location queries."""
key_parts = [self.LOCATIONS_PREFIX]
if bounds:
# Use geohash for spatial locality
geohash = self._bounds_to_geohash(bounds)
key_parts.append(f"geo:{geohash}")
if filters:
# Create deterministic hash of filters
filter_hash = self._hash_filters(filters)
key_parts.append(f"filters:{filter_hash}")
if zoom_level is not None:
key_parts.append(f"zoom:{zoom_level}")
return ":".join(key_parts)
def get_clusters_cache_key(self, bounds: Optional[GeoBounds],
filters: Optional[MapFilters],
zoom_level: int) -> str:
"""Generate cache key for cluster queries."""
key_parts = [self.CLUSTERS_PREFIX, f"zoom:{zoom_level}"]
if bounds:
geohash = self._bounds_to_geohash(bounds)
key_parts.append(f"geo:{geohash}")
if filters:
filter_hash = self._hash_filters(filters)
key_parts.append(f"filters:{filter_hash}")
return ":".join(key_parts)
def get_location_detail_cache_key(self, location_type: str, location_id: int) -> str:
"""Generate cache key for individual location details."""
return f"{self.DETAIL_PREFIX}:{location_type}:{location_id}"
def cache_locations(self, cache_key: str, locations: List[UnifiedLocation],
ttl: Optional[int] = None) -> None:
"""Cache location data."""
try:
# Convert locations to serializable format
cache_data = {
'locations': [loc.to_dict() for loc in locations],
'cached_at': timezone.now().isoformat(),
'count': len(locations)
}
cache.set(cache_key, cache_data, ttl or self.DEFAULT_TTL)
except Exception as e:
# Log error but don't fail the request
print(f"Cache write error for key {cache_key}: {e}")
def cache_clusters(self, cache_key: str, clusters: List[ClusterData],
ttl: Optional[int] = None) -> None:
"""Cache cluster data."""
try:
cache_data = {
'clusters': [cluster.to_dict() for cluster in clusters],
'cached_at': timezone.now().isoformat(),
'count': len(clusters)
}
cache.set(cache_key, cache_data, ttl or self.CLUSTER_TTL)
except Exception as e:
print(f"Cache write error for clusters {cache_key}: {e}")
def cache_map_response(self, cache_key: str, response: MapResponse,
ttl: Optional[int] = None) -> None:
"""Cache complete map response."""
try:
cache_data = response.to_dict()
cache_data['cached_at'] = timezone.now().isoformat()
cache.set(cache_key, cache_data, ttl or self.DEFAULT_TTL)
except Exception as e:
print(f"Cache write error for response {cache_key}: {e}")
def get_cached_locations(self, cache_key: str) -> Optional[List[UnifiedLocation]]:
"""Retrieve cached location data."""
try:
cache_data = cache.get(cache_key)
if not cache_data:
self.cache_stats['misses'] += 1
return None
self.cache_stats['hits'] += 1
# Convert back to UnifiedLocation objects
locations = []
for loc_data in cache_data['locations']:
# Reconstruct UnifiedLocation from dictionary
locations.append(self._dict_to_unified_location(loc_data))
return locations
except Exception as e:
print(f"Cache read error for key {cache_key}: {e}")
self.cache_stats['misses'] += 1
return None
def get_cached_clusters(self, cache_key: str) -> Optional[List[ClusterData]]:
"""Retrieve cached cluster data."""
try:
cache_data = cache.get(cache_key)
if not cache_data:
self.cache_stats['misses'] += 1
return None
self.cache_stats['hits'] += 1
# Convert back to ClusterData objects
clusters = []
for cluster_data in cache_data['clusters']:
clusters.append(self._dict_to_cluster_data(cluster_data))
return clusters
except Exception as e:
print(f"Cache read error for clusters {cache_key}: {e}")
self.cache_stats['misses'] += 1
return None
def get_cached_map_response(self, cache_key: str) -> Optional[MapResponse]:
"""Retrieve cached map response."""
try:
cache_data = cache.get(cache_key)
if not cache_data:
self.cache_stats['misses'] += 1
return None
self.cache_stats['hits'] += 1
# Convert back to MapResponse object
return self._dict_to_map_response(cache_data['data'])
except Exception as e:
print(f"Cache read error for response {cache_key}: {e}")
self.cache_stats['misses'] += 1
return None
def invalidate_location_cache(self, location_type: str, location_id: Optional[int] = None) -> None:
"""Invalidate cache for specific location or all locations of a type."""
try:
if location_id:
# Invalidate specific location detail
detail_key = self.get_location_detail_cache_key(location_type, location_id)
cache.delete(detail_key)
# Invalidate related location and cluster caches
# In a production system, you'd want more sophisticated cache tagging
cache.delete_many([
f"{self.LOCATIONS_PREFIX}:*",
f"{self.CLUSTERS_PREFIX}:*"
])
self.cache_stats['invalidations'] += 1
except Exception as e:
print(f"Cache invalidation error: {e}")
def invalidate_bounds_cache(self, bounds: GeoBounds) -> None:
"""Invalidate cache for specific geographic bounds."""
try:
geohash = self._bounds_to_geohash(bounds)
pattern = f"{self.LOCATIONS_PREFIX}:geo:{geohash}*"
# In production, you'd use cache tagging or Redis SCAN
# For now, we'll invalidate broader patterns
cache.delete_many([pattern])
self.cache_stats['invalidations'] += 1
except Exception as e:
print(f"Bounds cache invalidation error: {e}")
def clear_all_map_cache(self) -> None:
"""Clear all map-related cache data."""
try:
cache.delete_many([
f"{self.LOCATIONS_PREFIX}:*",
f"{self.CLUSTERS_PREFIX}:*",
f"{self.BOUNDS_PREFIX}:*",
f"{self.DETAIL_PREFIX}:*"
])
self.cache_stats['invalidations'] += 1
except Exception as e:
print(f"Cache clear error: {e}")
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics."""
total_requests = self.cache_stats['hits'] + self.cache_stats['misses']
hit_rate = (self.cache_stats['hits'] / total_requests * 100) if total_requests > 0 else 0
return {
'hits': self.cache_stats['hits'],
'misses': self.cache_stats['misses'],
'hit_rate_percent': round(hit_rate, 2),
'invalidations': self.cache_stats['invalidations'],
'geohash_partitions': self.cache_stats['geohash_partitions']
}
def record_performance_metrics(self, metrics: QueryPerformanceMetrics) -> None:
"""Record query performance metrics for analysis."""
try:
stats_key = f"{self.STATS_PREFIX}:performance:{int(time.time() // 300)}" # 5-minute buckets
current_stats = cache.get(stats_key, {
'query_count': 0,
'total_time_ms': 0,
'cache_hits': 0,
'db_queries': 0
})
current_stats['query_count'] += 1
current_stats['total_time_ms'] += metrics.query_time_ms
current_stats['cache_hits'] += 1 if metrics.cache_hit else 0
current_stats['db_queries'] += metrics.db_query_count
cache.set(stats_key, current_stats, 3600) # Keep for 1 hour
except Exception as e:
print(f"Performance metrics recording error: {e}")
def _bounds_to_geohash(self, bounds: GeoBounds) -> str:
"""Convert geographic bounds to geohash for cache partitioning."""
# Use center point of bounds for geohash
center_lat = (bounds.north + bounds.south) / 2
center_lng = (bounds.east + bounds.west) / 2
# Simple geohash implementation (in production, use a library)
return self._encode_geohash(center_lat, center_lng, self.GEOHASH_PRECISION)
def _encode_geohash(self, lat: float, lng: float, precision: int) -> str:
"""Simple geohash encoding implementation."""
# This is a simplified implementation
# In production, use the `geohash` library
lat_range = [-90.0, 90.0]
lng_range = [-180.0, 180.0]
geohash = ""
bits = 0
bit_count = 0
even_bit = True
while len(geohash) < precision:
if even_bit:
# longitude
mid = (lng_range[0] + lng_range[1]) / 2
if lng >= mid:
bits = (bits << 1) + 1
lng_range[0] = mid
else:
bits = bits << 1
lng_range[1] = mid
else:
# latitude
mid = (lat_range[0] + lat_range[1]) / 2
if lat >= mid:
bits = (bits << 1) + 1
lat_range[0] = mid
else:
bits = bits << 1
lat_range[1] = mid
even_bit = not even_bit
bit_count += 1
if bit_count == 5:
# Convert 5 bits to base32 character
geohash += "0123456789bcdefghjkmnpqrstuvwxyz"[bits]
bits = 0
bit_count = 0
return geohash
def _hash_filters(self, filters: MapFilters) -> str:
"""Create deterministic hash of filters for cache keys."""
filter_dict = filters.to_dict()
# Sort to ensure consistent ordering
filter_str = json.dumps(filter_dict, sort_keys=True)
return hashlib.md5(filter_str.encode()).hexdigest()[:8]
def _dict_to_unified_location(self, data: Dict[str, Any]) -> UnifiedLocation:
"""Convert dictionary back to UnifiedLocation object."""
from .data_structures import LocationType
return UnifiedLocation(
id=data['id'],
type=LocationType(data['type']),
name=data['name'],
coordinates=tuple(data['coordinates']),
address=data.get('address'),
metadata=data.get('metadata', {}),
type_data=data.get('type_data', {}),
cluster_weight=data.get('cluster_weight', 1),
cluster_category=data.get('cluster_category', 'default')
)
def _dict_to_cluster_data(self, data: Dict[str, Any]) -> ClusterData:
"""Convert dictionary back to ClusterData object."""
from .data_structures import LocationType
bounds = GeoBounds(**data['bounds'])
types = {LocationType(t) for t in data['types']}
representative = None
if data.get('representative'):
representative = self._dict_to_unified_location(data['representative'])
return ClusterData(
id=data['id'],
coordinates=tuple(data['coordinates']),
count=data['count'],
types=types,
bounds=bounds,
representative_location=representative
)
def _dict_to_map_response(self, data: Dict[str, Any]) -> MapResponse:
"""Convert dictionary back to MapResponse object."""
locations = [self._dict_to_unified_location(loc) for loc in data.get('locations', [])]
clusters = [self._dict_to_cluster_data(cluster) for cluster in data.get('clusters', [])]
bounds = None
if data.get('bounds'):
bounds = GeoBounds(**data['bounds'])
return MapResponse(
locations=locations,
clusters=clusters,
bounds=bounds,
total_count=data.get('total_count', 0),
filtered_count=data.get('filtered_count', 0),
zoom_level=data.get('zoom_level'),
clustered=data.get('clustered', False)
)
# Global cache service instance
map_cache = MapCacheService()

View File

@@ -0,0 +1,427 @@
"""
Unified Map Service - Main orchestrating service for all map functionality.
"""
import time
from typing import List, Optional, Dict, Any, Set
from django.db import connection
from django.utils import timezone
from .data_structures import (
UnifiedLocation,
ClusterData,
GeoBounds,
MapFilters,
MapResponse,
LocationType,
QueryPerformanceMetrics
)
from .location_adapters import LocationAbstractionLayer
from .clustering_service import ClusteringService
from .map_cache_service import MapCacheService
class UnifiedMapService:
"""
Main service orchestrating map data retrieval, filtering, clustering, and caching.
Provides a unified interface for all location types with performance optimization.
"""
# Performance thresholds
MAX_UNCLUSTERED_POINTS = 500
MAX_CLUSTERED_POINTS = 2000
DEFAULT_ZOOM_LEVEL = 10
def __init__(self):
self.location_layer = LocationAbstractionLayer()
self.clustering_service = ClusteringService()
self.cache_service = MapCacheService()
def get_map_data(
self,
bounds: Optional[GeoBounds] = None,
filters: Optional[MapFilters] = None,
zoom_level: int = DEFAULT_ZOOM_LEVEL,
cluster: bool = True,
use_cache: bool = True
) -> MapResponse:
"""
Primary method for retrieving unified map data.
Args:
bounds: Geographic bounds to query within
filters: Filtering criteria for locations
zoom_level: Map zoom level for clustering decisions
cluster: Whether to apply clustering
use_cache: Whether to use cached data
Returns:
MapResponse with locations, clusters, and metadata
"""
start_time = time.time()
initial_query_count = len(connection.queries)
cache_hit = False
try:
# Generate cache key
cache_key = None
if use_cache:
cache_key = self._generate_cache_key(bounds, filters, zoom_level, cluster)
# Try to get from cache first
cached_response = self.cache_service.get_cached_map_response(cache_key)
if cached_response:
cached_response.cache_hit = True
cached_response.query_time_ms = int((time.time() - start_time) * 1000)
return cached_response
# Get locations from database
locations = self._get_locations_from_db(bounds, filters)
# Apply smart limiting based on zoom level and density
locations = self._apply_smart_limiting(locations, bounds, zoom_level)
# Determine if clustering should be applied
should_cluster = cluster and self.clustering_service.should_cluster(zoom_level, len(locations))
# Apply clustering if needed
clusters = []
if should_cluster:
locations, clusters = self.clustering_service.cluster_locations(
locations, zoom_level, bounds
)
# Calculate response bounds
response_bounds = self._calculate_response_bounds(locations, clusters, bounds)
# Create response
response = MapResponse(
locations=locations,
clusters=clusters,
bounds=response_bounds,
total_count=len(locations) + sum(cluster.count for cluster in clusters),
filtered_count=len(locations),
zoom_level=zoom_level,
clustered=should_cluster,
cache_hit=cache_hit,
query_time_ms=int((time.time() - start_time) * 1000),
filters_applied=self._get_applied_filters_list(filters)
)
# Cache the response
if use_cache and cache_key:
self.cache_service.cache_map_response(cache_key, response)
# Record performance metrics
self._record_performance_metrics(
start_time, initial_query_count, cache_hit, len(locations) + len(clusters),
bounds is not None, should_cluster
)
return response
except Exception as e:
# Return error response
return MapResponse(
locations=[],
clusters=[],
total_count=0,
filtered_count=0,
query_time_ms=int((time.time() - start_time) * 1000),
cache_hit=False
)
def get_location_details(self, location_type: str, location_id: int) -> Optional[UnifiedLocation]:
"""
Get detailed information for a specific location.
Args:
location_type: Type of location (park, ride, company, generic)
location_id: ID of the location
Returns:
UnifiedLocation with full details or None if not found
"""
try:
# Check cache first
cache_key = self.cache_service.get_location_detail_cache_key(location_type, location_id)
cached_locations = self.cache_service.get_cached_locations(cache_key)
if cached_locations:
return cached_locations[0] if cached_locations else None
# Get from database
location_type_enum = LocationType(location_type.lower())
location = self.location_layer.get_location_by_id(location_type_enum, location_id)
# Cache the result
if location:
self.cache_service.cache_locations(cache_key, [location],
self.cache_service.LOCATION_DETAIL_TTL)
return location
except Exception as e:
print(f"Error getting location details: {e}")
return None
def search_locations(
self,
query: str,
bounds: Optional[GeoBounds] = None,
location_types: Optional[Set[LocationType]] = None,
limit: int = 50
) -> List[UnifiedLocation]:
"""
Search locations with text query.
Args:
query: Search query string
bounds: Optional geographic bounds to search within
location_types: Optional set of location types to search
limit: Maximum number of results
Returns:
List of matching UnifiedLocation objects
"""
try:
# Create search filters
filters = MapFilters(
search_query=query,
location_types=location_types or {LocationType.PARK, LocationType.RIDE},
has_coordinates=True
)
# Get locations
locations = self.location_layer.get_all_locations(bounds, filters)
# Apply limit
return locations[:limit]
except Exception as e:
print(f"Error searching locations: {e}")
return []
def get_locations_by_bounds(
self,
north: float,
south: float,
east: float,
west: float,
location_types: Optional[Set[LocationType]] = None,
zoom_level: int = DEFAULT_ZOOM_LEVEL
) -> MapResponse:
"""
Get locations within specific geographic bounds.
Args:
north, south, east, west: Bounding box coordinates
location_types: Optional filter for location types
zoom_level: Map zoom level for optimization
Returns:
MapResponse with locations in bounds
"""
try:
bounds = GeoBounds(north=north, south=south, east=east, west=west)
filters = MapFilters(location_types=location_types) if location_types else None
return self.get_map_data(bounds=bounds, filters=filters, zoom_level=zoom_level)
except ValueError as e:
# Invalid bounds
return MapResponse(
locations=[],
clusters=[],
total_count=0,
filtered_count=0
)
def get_clustered_locations(
self,
zoom_level: int,
bounds: Optional[GeoBounds] = None,
filters: Optional[MapFilters] = None
) -> MapResponse:
"""
Get clustered location data for map display.
Args:
zoom_level: Map zoom level for clustering configuration
bounds: Optional geographic bounds
filters: Optional filtering criteria
Returns:
MapResponse with clustered data
"""
return self.get_map_data(
bounds=bounds,
filters=filters,
zoom_level=zoom_level,
cluster=True
)
def get_locations_by_type(
self,
location_type: LocationType,
bounds: Optional[GeoBounds] = None,
limit: Optional[int] = None
) -> List[UnifiedLocation]:
"""
Get locations of a specific type.
Args:
location_type: Type of locations to retrieve
bounds: Optional geographic bounds
limit: Optional limit on results
Returns:
List of UnifiedLocation objects
"""
try:
filters = MapFilters(location_types={location_type})
locations = self.location_layer.get_locations_by_type(location_type, bounds, filters)
if limit:
locations = locations[:limit]
return locations
except Exception as e:
print(f"Error getting locations by type: {e}")
return []
def invalidate_cache(self, location_type: Optional[str] = None,
location_id: Optional[int] = None,
bounds: Optional[GeoBounds] = None) -> None:
"""
Invalidate cached map data.
Args:
location_type: Optional specific location type to invalidate
location_id: Optional specific location ID to invalidate
bounds: Optional specific bounds to invalidate
"""
if location_type and location_id:
self.cache_service.invalidate_location_cache(location_type, location_id)
elif bounds:
self.cache_service.invalidate_bounds_cache(bounds)
else:
self.cache_service.clear_all_map_cache()
def get_service_stats(self) -> Dict[str, Any]:
"""Get service performance and usage statistics."""
cache_stats = self.cache_service.get_cache_stats()
return {
'cache_performance': cache_stats,
'clustering_available': True,
'supported_location_types': [t.value for t in LocationType],
'max_unclustered_points': self.MAX_UNCLUSTERED_POINTS,
'max_clustered_points': self.MAX_CLUSTERED_POINTS,
'service_version': '1.0.0'
}
def _get_locations_from_db(self, bounds: Optional[GeoBounds],
filters: Optional[MapFilters]) -> List[UnifiedLocation]:
"""Get locations from database using the abstraction layer."""
return self.location_layer.get_all_locations(bounds, filters)
def _apply_smart_limiting(self, locations: List[UnifiedLocation],
bounds: Optional[GeoBounds], zoom_level: int) -> List[UnifiedLocation]:
"""Apply intelligent limiting based on zoom level and density."""
if zoom_level < 6: # Very zoomed out - show only major parks
major_parks = [
loc for loc in locations
if (loc.type == LocationType.PARK and
loc.cluster_category in ['major_park', 'theme_park'])
]
return major_parks[:200]
elif zoom_level < 10: # Regional level
return locations[:1000]
else: # City level and closer
return locations[:self.MAX_CLUSTERED_POINTS]
def _calculate_response_bounds(self, locations: List[UnifiedLocation],
clusters: List[ClusterData],
request_bounds: Optional[GeoBounds]) -> Optional[GeoBounds]:
"""Calculate the actual bounds of the response data."""
if request_bounds:
return request_bounds
all_coords = []
# Add location coordinates
for loc in locations:
all_coords.append((loc.latitude, loc.longitude))
# Add cluster coordinates
for cluster in clusters:
all_coords.append(cluster.coordinates)
if not all_coords:
return None
lats, lngs = zip(*all_coords)
return GeoBounds(
north=max(lats),
south=min(lats),
east=max(lngs),
west=min(lngs)
)
def _get_applied_filters_list(self, filters: Optional[MapFilters]) -> List[str]:
"""Get list of applied filter types for metadata."""
if not filters:
return []
applied = []
if filters.location_types:
applied.append('location_types')
if filters.search_query:
applied.append('search_query')
if filters.park_status:
applied.append('park_status')
if filters.ride_types:
applied.append('ride_types')
if filters.company_roles:
applied.append('company_roles')
if filters.min_rating:
applied.append('min_rating')
if filters.country:
applied.append('country')
if filters.state:
applied.append('state')
if filters.city:
applied.append('city')
return applied
def _generate_cache_key(self, bounds: Optional[GeoBounds], filters: Optional[MapFilters],
zoom_level: int, cluster: bool) -> str:
"""Generate cache key for the request."""
if cluster:
return self.cache_service.get_clusters_cache_key(bounds, filters, zoom_level)
else:
return self.cache_service.get_locations_cache_key(bounds, filters, zoom_level)
def _record_performance_metrics(self, start_time: float, initial_query_count: int,
cache_hit: bool, result_count: int, bounds_used: bool,
clustering_used: bool) -> None:
"""Record performance metrics for monitoring."""
query_time_ms = int((time.time() - start_time) * 1000)
db_query_count = len(connection.queries) - initial_query_count
metrics = QueryPerformanceMetrics(
query_time_ms=query_time_ms,
db_query_count=db_query_count,
cache_hit=cache_hit,
result_count=result_count,
bounds_used=bounds_used,
clustering_used=clustering_used
)
self.cache_service.record_performance_metrics(metrics)
# Global service instance
unified_map_service = UnifiedMapService()

37
core/urls/map_urls.py Normal file
View File

@@ -0,0 +1,37 @@
"""
URL patterns for the unified map service API.
"""
from django.urls import path
from ..views.map_views import (
MapLocationsView,
MapLocationDetailView,
MapSearchView,
MapBoundsView,
MapStatsView,
MapCacheView
)
app_name = 'map_api'
urlpatterns = [
# Main map data endpoint
path('locations/', MapLocationsView.as_view(), name='locations'),
# Location detail endpoint
path('locations/<str:location_type>/<int:location_id>/',
MapLocationDetailView.as_view(), name='location_detail'),
# Search endpoint
path('search/', MapSearchView.as_view(), name='search'),
# Bounds-based query endpoint
path('bounds/', MapBoundsView.as_view(), name='bounds'),
# Service statistics endpoint
path('stats/', MapStatsView.as_view(), name='stats'),
# Cache management endpoints
path('cache/', MapCacheView.as_view(), name='cache'),
path('cache/invalidate/', MapCacheView.as_view(), name='cache_invalidate'),
]

12
core/urls/search.py Normal file
View File

@@ -0,0 +1,12 @@
from django.urls import path
from core.views.search import AdaptiveSearchView, FilterFormView
from rides.views import RideSearchView
app_name = 'search'
urlpatterns = [
path('parks/', AdaptiveSearchView.as_view(), name='search'),
path('parks/filters/', FilterFormView.as_view(), name='filter_form'),
path('rides/', RideSearchView.as_view(), name='ride_search'),
path('rides/results/', RideSearchView.as_view(), name='ride_search_results'),
]

2
core/views/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from .search import *
from .views import *

394
core/views/map_views.py Normal file
View File

@@ -0,0 +1,394 @@
"""
API views for the unified map service.
"""
import json
from typing import Dict, Any, Optional, Set
from django.http import JsonResponse, HttpRequest, Http404
from django.views.decorators.http import require_http_methods
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
from django.views import View
from django.core.exceptions import ValidationError
from ..services.map_service import unified_map_service
from ..services.data_structures import GeoBounds, MapFilters, LocationType
class MapAPIView(View):
"""Base view for map API endpoints with common functionality."""
def dispatch(self, request, *args, **kwargs):
"""Add CORS headers and handle preflight requests."""
response = super().dispatch(request, *args, **kwargs)
# Add CORS headers for API access
response['Access-Control-Allow-Origin'] = '*'
response['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
response['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
return response
def options(self, request, *args, **kwargs):
"""Handle preflight CORS requests."""
return JsonResponse({}, status=200)
def _parse_bounds(self, request: HttpRequest) -> Optional[GeoBounds]:
"""Parse geographic bounds from request parameters."""
try:
north = request.GET.get('north')
south = request.GET.get('south')
east = request.GET.get('east')
west = request.GET.get('west')
if all(param is not None for param in [north, south, east, west]):
return GeoBounds(
north=float(north),
south=float(south),
east=float(east),
west=float(west)
)
return None
except (ValueError, TypeError) as e:
raise ValidationError(f"Invalid bounds parameters: {e}")
def _parse_filters(self, request: HttpRequest) -> Optional[MapFilters]:
"""Parse filtering parameters from request."""
try:
filters = MapFilters()
# Location types
location_types_param = request.GET.get('types')
if location_types_param:
type_strings = location_types_param.split(',')
filters.location_types = {
LocationType(t.strip()) for t in type_strings
if t.strip() in [lt.value for lt in LocationType]
}
# Park status
park_status_param = request.GET.get('park_status')
if park_status_param:
filters.park_status = set(park_status_param.split(','))
# Ride types
ride_types_param = request.GET.get('ride_types')
if ride_types_param:
filters.ride_types = set(ride_types_param.split(','))
# Company roles
company_roles_param = request.GET.get('company_roles')
if company_roles_param:
filters.company_roles = set(company_roles_param.split(','))
# Search query
filters.search_query = request.GET.get('q') or request.GET.get('search')
# Rating filter
min_rating_param = request.GET.get('min_rating')
if min_rating_param:
filters.min_rating = float(min_rating_param)
# Geographic filters
filters.country = request.GET.get('country')
filters.state = request.GET.get('state')
filters.city = request.GET.get('city')
# Coordinates requirement
has_coordinates_param = request.GET.get('has_coordinates')
if has_coordinates_param is not None:
filters.has_coordinates = has_coordinates_param.lower() in ['true', '1', 'yes']
return filters if any([
filters.location_types, filters.park_status, filters.ride_types,
filters.company_roles, filters.search_query, filters.min_rating,
filters.country, filters.state, filters.city
]) else None
except (ValueError, TypeError) as e:
raise ValidationError(f"Invalid filter parameters: {e}")
def _parse_zoom_level(self, request: HttpRequest) -> int:
"""Parse zoom level from request with default."""
try:
zoom_param = request.GET.get('zoom', '10')
zoom_level = int(zoom_param)
return max(1, min(20, zoom_level)) # Clamp between 1 and 20
except (ValueError, TypeError):
return 10 # Default zoom level
def _error_response(self, message: str, status: int = 400) -> JsonResponse:
"""Return standardized error response."""
return JsonResponse({
'status': 'error',
'message': message,
'data': None
}, status=status)
class MapLocationsView(MapAPIView):
"""
API endpoint for getting map locations with optional clustering.
GET /api/map/locations/
Parameters:
- north, south, east, west: Bounding box coordinates
- zoom: Zoom level (1-20)
- types: Comma-separated location types (park,ride,company,generic)
- cluster: Whether to enable clustering (true/false)
- q: Search query
- park_status: Park status filter
- ride_types: Ride type filter
- min_rating: Minimum rating filter
- country, state, city: Geographic filters
"""
@method_decorator(cache_page(300)) # Cache for 5 minutes
def get(self, request: HttpRequest) -> JsonResponse:
"""Get map locations with optional clustering and filtering."""
try:
# Parse parameters
bounds = self._parse_bounds(request)
filters = self._parse_filters(request)
zoom_level = self._parse_zoom_level(request)
# Clustering preference
cluster_param = request.GET.get('cluster', 'true')
enable_clustering = cluster_param.lower() in ['true', '1', 'yes']
# Cache preference
use_cache_param = request.GET.get('cache', 'true')
use_cache = use_cache_param.lower() in ['true', '1', 'yes']
# Get map data
response = unified_map_service.get_map_data(
bounds=bounds,
filters=filters,
zoom_level=zoom_level,
cluster=enable_clustering,
use_cache=use_cache
)
return JsonResponse(response.to_dict())
except ValidationError as e:
return self._error_response(str(e), 400)
except Exception as e:
return self._error_response(f"Internal server error: {str(e)}", 500)
class MapLocationDetailView(MapAPIView):
"""
API endpoint for getting detailed information about a specific location.
GET /api/map/locations/<type>/<id>/
"""
@method_decorator(cache_page(600)) # Cache for 10 minutes
def get(self, request: HttpRequest, location_type: str, location_id: int) -> JsonResponse:
"""Get detailed information for a specific location."""
try:
# Validate location type
if location_type not in [lt.value for lt in LocationType]:
return self._error_response(f"Invalid location type: {location_type}", 400)
# Get location details
location = unified_map_service.get_location_details(location_type, location_id)
if not location:
return self._error_response("Location not found", 404)
return JsonResponse({
'status': 'success',
'data': location.to_dict()
})
except Exception as e:
return self._error_response(f"Internal server error: {str(e)}", 500)
class MapSearchView(MapAPIView):
"""
API endpoint for searching locations by text query.
GET /api/map/search/
Parameters:
- q: Search query (required)
- north, south, east, west: Optional bounding box
- types: Comma-separated location types
- limit: Maximum results (default 50)
"""
def get(self, request: HttpRequest) -> JsonResponse:
"""Search locations by text query."""
try:
# Get search query
query = request.GET.get('q')
if not query:
return self._error_response("Search query 'q' parameter is required", 400)
# Parse optional parameters
bounds = self._parse_bounds(request)
# Parse location types
location_types = None
types_param = request.GET.get('types')
if types_param:
try:
location_types = {
LocationType(t.strip()) for t in types_param.split(',')
if t.strip() in [lt.value for lt in LocationType]
}
except ValueError:
return self._error_response("Invalid location types", 400)
# Parse limit
limit = min(100, max(1, int(request.GET.get('limit', '50'))))
# Perform search
locations = unified_map_service.search_locations(
query=query,
bounds=bounds,
location_types=location_types,
limit=limit
)
return JsonResponse({
'status': 'success',
'data': {
'locations': [loc.to_dict() for loc in locations],
'query': query,
'count': len(locations),
'limit': limit
}
})
except ValueError as e:
return self._error_response(str(e), 400)
except Exception as e:
return self._error_response(f"Internal server error: {str(e)}", 500)
class MapBoundsView(MapAPIView):
"""
API endpoint for getting locations within specific bounds.
GET /api/map/bounds/
Parameters:
- north, south, east, west: Bounding box coordinates (required)
- types: Comma-separated location types
- zoom: Zoom level
"""
@method_decorator(cache_page(300)) # Cache for 5 minutes
def get(self, request: HttpRequest) -> JsonResponse:
"""Get locations within specific geographic bounds."""
try:
# Parse required bounds
bounds = self._parse_bounds(request)
if not bounds:
return self._error_response(
"Bounds parameters required: north, south, east, west", 400
)
# Parse optional filters
location_types = None
types_param = request.GET.get('types')
if types_param:
location_types = {
LocationType(t.strip()) for t in types_param.split(',')
if t.strip() in [lt.value for lt in LocationType]
}
zoom_level = self._parse_zoom_level(request)
# Get locations within bounds
response = unified_map_service.get_locations_by_bounds(
north=bounds.north,
south=bounds.south,
east=bounds.east,
west=bounds.west,
location_types=location_types,
zoom_level=zoom_level
)
return JsonResponse(response.to_dict())
except ValidationError as e:
return self._error_response(str(e), 400)
except Exception as e:
return self._error_response(f"Internal server error: {str(e)}", 500)
class MapStatsView(MapAPIView):
"""
API endpoint for getting map service statistics and health information.
GET /api/map/stats/
"""
def get(self, request: HttpRequest) -> JsonResponse:
"""Get map service statistics and performance metrics."""
try:
stats = unified_map_service.get_service_stats()
return JsonResponse({
'status': 'success',
'data': stats
})
except Exception as e:
return self._error_response(f"Internal server error: {str(e)}", 500)
class MapCacheView(MapAPIView):
"""
API endpoint for cache management (admin only).
DELETE /api/map/cache/
POST /api/map/cache/invalidate/
"""
def delete(self, request: HttpRequest) -> JsonResponse:
"""Clear all map cache (admin only)."""
# TODO: Add admin permission check
try:
unified_map_service.invalidate_cache()
return JsonResponse({
'status': 'success',
'message': 'Map cache cleared successfully'
})
except Exception as e:
return self._error_response(f"Internal server error: {str(e)}", 500)
def post(self, request: HttpRequest) -> JsonResponse:
"""Invalidate specific cache entries."""
# TODO: Add admin permission check
try:
data = json.loads(request.body)
location_type = data.get('location_type')
location_id = data.get('location_id')
bounds_data = data.get('bounds')
bounds = None
if bounds_data:
bounds = GeoBounds(**bounds_data)
unified_map_service.invalidate_cache(
location_type=location_type,
location_id=location_id,
bounds=bounds
)
return JsonResponse({
'status': 'success',
'message': 'Cache invalidated successfully'
})
except (json.JSONDecodeError, TypeError, ValueError) as e:
return self._error_response(f"Invalid request data: {str(e)}", 400)
except Exception as e:
return self._error_response(f"Internal server error: {str(e)}", 500)

48
core/views/search.py Normal file
View File

@@ -0,0 +1,48 @@
from django.views.generic import TemplateView
from parks.models import Park
from parks.filters import ParkFilter
class AdaptiveSearchView(TemplateView):
template_name = "core/search/results.html"
def get_queryset(self):
"""
Get the base queryset, optimized with select_related and prefetch_related
"""
return Park.objects.select_related('owner').prefetch_related(
'location',
'photos'
).all()
def get_filterset(self):
"""
Get the filterset instance
"""
return ParkFilter(self.request.GET, queryset=self.get_queryset())
def get_context_data(self, **kwargs):
"""
Add filtered results and filter form to context
"""
context = super().get_context_data(**kwargs)
filterset = self.get_filterset()
context.update({
'results': filterset.qs,
'filters': filterset,
'applied_filters': bool(self.request.GET), # Check if any filters are applied
})
return context
class FilterFormView(TemplateView):
"""
View for rendering just the filter form for HTMX updates
"""
template_name = "core/search/filters.html"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
filterset = ParkFilter(self.request.GET, queryset=Park.objects.all())
context['filters'] = filterset
return context