Refactor test utilities and enhance ASGI settings

- Cleaned up and standardized assertions in ApiTestMixin for API response validation.
- Updated ASGI settings to use os.environ for setting the DJANGO_SETTINGS_MODULE.
- Removed unused imports and improved formatting in settings.py.
- Refactored URL patterns in urls.py for better readability and organization.
- Enhanced view functions in views.py for consistency and clarity.
- Added .flake8 configuration for linting and style enforcement.
- Introduced type stubs for django-environ to improve type checking with Pylance.
This commit is contained in:
pacnpal
2025-08-20 19:51:59 -04:00
parent 69c07d1381
commit 66ed4347a9
230 changed files with 15094 additions and 11578 deletions

View File

@@ -1 +1 @@
# Core views
# Core views

View File

@@ -19,157 +19,165 @@ class HealthCheckAPIView(APIView):
"""
Enhanced API endpoint for health checks with detailed JSON response
"""
permission_classes = [AllowAny] # Public endpoint
def get(self, request):
"""Return comprehensive health check information"""
start_time = time.time()
# Get basic health check results
main_view = MainView()
main_view.request = request
plugins = main_view.plugins
errors = main_view.errors
# Collect additional performance metrics
cache_monitor = CacheMonitor()
cache_stats = cache_monitor.get_cache_stats()
# Build comprehensive health data
health_data = {
'status': 'healthy' if not errors else 'unhealthy',
'timestamp': timezone.now().isoformat(),
'version': getattr(settings, 'VERSION', '1.0.0'),
'environment': getattr(settings, 'ENVIRONMENT', 'development'),
'response_time_ms': 0, # Will be calculated at the end
'checks': {},
'metrics': {
'cache': cache_stats,
'database': self._get_database_metrics(),
'system': self._get_system_metrics(),
}
"status": "healthy" if not errors else "unhealthy",
"timestamp": timezone.now().isoformat(),
"version": getattr(settings, "VERSION", "1.0.0"),
"environment": getattr(settings, "ENVIRONMENT", "development"),
"response_time_ms": 0, # Will be calculated at the end
"checks": {},
"metrics": {
"cache": cache_stats,
"database": self._get_database_metrics(),
"system": self._get_system_metrics(),
},
}
# Process individual health checks
for plugin in plugins:
plugin_name = plugin.identifier()
plugin_errors = errors.get(plugin.__class__.__name__, [])
health_data['checks'][plugin_name] = {
'status': 'healthy' if not plugin_errors else 'unhealthy',
'critical': getattr(plugin, 'critical_service', False),
'errors': [str(error) for error in plugin_errors],
'response_time_ms': getattr(plugin, '_response_time', None)
health_data["checks"][plugin_name] = {
"status": "healthy" if not plugin_errors else "unhealthy",
"critical": getattr(plugin, "critical_service", False),
"errors": [str(error) for error in plugin_errors],
"response_time_ms": getattr(plugin, "_response_time", None),
}
# Calculate total response time
health_data['response_time_ms'] = round((time.time() - start_time) * 1000, 2)
health_data["response_time_ms"] = round((time.time() - start_time) * 1000, 2)
# Determine HTTP status code
status_code = 200
if errors:
# Check if any critical services are failing
critical_errors = any(
getattr(plugin, 'critical_service', False)
for plugin in plugins
getattr(plugin, "critical_service", False)
for plugin in plugins
if errors.get(plugin.__class__.__name__)
)
status_code = 503 if critical_errors else 200
return Response(health_data, status=status_code)
def _get_database_metrics(self):
"""Get database performance metrics"""
try:
from django.db import connection
# Get basic connection info
metrics = {
'vendor': connection.vendor,
'connection_status': 'connected',
"vendor": connection.vendor,
"connection_status": "connected",
}
# Test query performance
start_time = time.time()
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
query_time = (time.time() - start_time) * 1000
metrics['test_query_time_ms'] = round(query_time, 2)
metrics["test_query_time_ms"] = round(query_time, 2)
# PostgreSQL specific metrics
if connection.vendor == 'postgresql':
if connection.vendor == "postgresql":
try:
with connection.cursor() as cursor:
cursor.execute("""
SELECT
cursor.execute(
"""
SELECT
numbackends as active_connections,
xact_commit as transactions_committed,
xact_rollback as transactions_rolled_back,
blks_read as blocks_read,
blks_hit as blocks_hit
FROM pg_stat_database
FROM pg_stat_database
WHERE datname = current_database()
""")
"""
)
row = cursor.fetchone()
if row:
metrics.update({
'active_connections': row[0],
'transactions_committed': row[1],
'transactions_rolled_back': row[2],
'cache_hit_ratio': round((row[4] / (row[3] + row[4])) * 100, 2) if (row[3] + row[4]) > 0 else 0
})
metrics.update(
{
"active_connections": row[0],
"transactions_committed": row[1],
"transactions_rolled_back": row[2],
"cache_hit_ratio": (
round(
(row[4] / (row[3] + row[4])) * 100,
2,
)
if (row[3] + row[4]) > 0
else 0
),
}
)
except Exception:
pass # Skip advanced metrics if not available
return metrics
except Exception as e:
return {
'connection_status': 'error',
'error': str(e)
}
return {"connection_status": "error", "error": str(e)}
def _get_system_metrics(self):
"""Get system performance metrics"""
metrics = {
'debug_mode': settings.DEBUG,
'allowed_hosts': settings.ALLOWED_HOSTS if settings.DEBUG else ['hidden'],
"debug_mode": settings.DEBUG,
"allowed_hosts": (settings.ALLOWED_HOSTS if settings.DEBUG else ["hidden"]),
}
try:
import psutil
# Memory metrics
memory = psutil.virtual_memory()
metrics['memory'] = {
'total_mb': round(memory.total / 1024 / 1024, 2),
'available_mb': round(memory.available / 1024 / 1024, 2),
'percent_used': memory.percent,
metrics["memory"] = {
"total_mb": round(memory.total / 1024 / 1024, 2),
"available_mb": round(memory.available / 1024 / 1024, 2),
"percent_used": memory.percent,
}
# CPU metrics
metrics['cpu'] = {
'percent_used': psutil.cpu_percent(interval=0.1),
'core_count': psutil.cpu_count(),
metrics["cpu"] = {
"percent_used": psutil.cpu_percent(interval=0.1),
"core_count": psutil.cpu_count(),
}
# Disk metrics
disk = psutil.disk_usage('/')
metrics['disk'] = {
'total_gb': round(disk.total / 1024 / 1024 / 1024, 2),
'free_gb': round(disk.free / 1024 / 1024 / 1024, 2),
'percent_used': round((disk.used / disk.total) * 100, 2),
disk = psutil.disk_usage("/")
metrics["disk"] = {
"total_gb": round(disk.total / 1024 / 1024 / 1024, 2),
"free_gb": round(disk.free / 1024 / 1024 / 1024, 2),
"percent_used": round((disk.used / disk.total) * 100, 2),
}
except ImportError:
metrics['system_monitoring'] = 'psutil not available'
metrics["system_monitoring"] = "psutil not available"
except Exception as e:
metrics['system_error'] = str(e)
metrics["system_error"] = str(e)
return metrics
@@ -177,80 +185,89 @@ class PerformanceMetricsView(APIView):
"""
API view for performance metrics and database analysis
"""
permission_classes = [AllowAny] if settings.DEBUG else []
def get(self, request):
"""Return performance metrics and analysis"""
if not settings.DEBUG:
return Response({'error': 'Only available in debug mode'}, status=403)
return Response({"error": "Only available in debug mode"}, status=403)
metrics = {
'timestamp': timezone.now().isoformat(),
'database_analysis': self._get_database_analysis(),
'cache_performance': self._get_cache_performance(),
'recent_slow_queries': self._get_slow_queries(),
"timestamp": timezone.now().isoformat(),
"database_analysis": self._get_database_analysis(),
"cache_performance": self._get_cache_performance(),
"recent_slow_queries": self._get_slow_queries(),
}
return Response(metrics)
def _get_database_analysis(self):
"""Analyze database performance"""
try:
from django.db import connection
analysis = {
'total_queries': len(connection.queries),
'query_analysis': IndexAnalyzer.analyze_slow_queries(0.05),
"total_queries": len(connection.queries),
"query_analysis": IndexAnalyzer.analyze_slow_queries(0.05),
}
if connection.queries:
query_times = [float(q.get('time', 0)) for q in connection.queries]
analysis.update({
'total_query_time': sum(query_times),
'average_query_time': sum(query_times) / len(query_times),
'slowest_query_time': max(query_times),
'fastest_query_time': min(query_times),
})
query_times = [float(q.get("time", 0)) for q in connection.queries]
analysis.update(
{
"total_query_time": sum(query_times),
"average_query_time": sum(query_times) / len(query_times),
"slowest_query_time": max(query_times),
"fastest_query_time": min(query_times),
}
)
return analysis
except Exception as e:
return {'error': str(e)}
return {"error": str(e)}
def _get_cache_performance(self):
"""Get cache performance metrics"""
try:
cache_monitor = CacheMonitor()
return cache_monitor.get_cache_stats()
except Exception as e:
return {'error': str(e)}
return {"error": str(e)}
def _get_slow_queries(self):
"""Get recent slow queries"""
try:
return IndexAnalyzer.analyze_slow_queries(0.1) # 100ms threshold
except Exception as e:
return {'error': str(e)}
return {"error": str(e)}
class SimpleHealthView(View):
"""
Simple health check endpoint for load balancers
"""
def get(self, request):
"""Return simple OK status"""
try:
# Basic database connectivity test
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
return JsonResponse({'status': 'ok', 'timestamp': timezone.now().isoformat()})
return JsonResponse(
{"status": "ok", "timestamp": timezone.now().isoformat()}
)
except Exception as e:
return JsonResponse(
{'status': 'error', 'error': str(e), 'timestamp': timezone.now().isoformat()},
status=503
{
"status": "error",
"error": str(e),
"timestamp": timezone.now().isoformat(),
},
status=503,
)

View File

@@ -5,15 +5,13 @@ Enhanced with proper error handling, pagination, and performance optimizations.
import json
import logging
from typing import Dict, Any, Optional, Set
from django.http import JsonResponse, HttpRequest, Http404
from django.views.decorators.http import require_http_methods
from typing import Dict, Any, Optional
from django.http import JsonResponse, HttpRequest
from django.views.decorators.cache import cache_page
from django.views.decorators.gzip import gzip_page
from django.utils.decorators import method_decorator
from django.views import View
from django.core.exceptions import ValidationError
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
from django.conf import settings
import time
@@ -25,250 +23,289 @@ logger = logging.getLogger(__name__)
class MapAPIView(View):
"""Base view for map API endpoints with common functionality."""
# Pagination settings
DEFAULT_PAGE_SIZE = 50
MAX_PAGE_SIZE = 200
def dispatch(self, request, *args, **kwargs):
"""Add CORS headers, compression, and handle preflight requests."""
start_time = time.time()
try:
response = super().dispatch(request, *args, **kwargs)
# Add CORS headers for API access
response['Access-Control-Allow-Origin'] = '*'
response['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
response['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
response["Access-Control-Allow-Origin"] = "*"
response["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
response["Access-Control-Allow-Headers"] = "Content-Type, Authorization"
# Add performance headers
response['X-Response-Time'] = f"{(time.time() - start_time) * 1000:.2f}ms"
# Add compression hint for large responses
if hasattr(response, 'content') and len(response.content) > 1024:
response['Content-Encoding'] = 'gzip'
return response
except Exception as e:
logger.error(f"API error in {request.path}: {str(e)}", exc_info=True)
return self._error_response(
"An internal server error occurred",
status=500
response["X-Response-Time"] = (
f"{(time.time() -
start_time) *
1000:.2f}ms"
)
# Add compression hint for large responses
if hasattr(response, "content") and len(response.content) > 1024:
response["Content-Encoding"] = "gzip"
return response
except Exception as e:
logger.error(
f"API error in {
request.path}: {
str(e)}",
exc_info=True,
)
return self._error_response("An internal server error occurred", status=500)
def options(self, request, *args, **kwargs):
"""Handle preflight CORS requests."""
return JsonResponse({}, status=200)
def _parse_bounds(self, request: HttpRequest) -> Optional[GeoBounds]:
"""Parse geographic bounds from request parameters."""
try:
north = request.GET.get('north')
south = request.GET.get('south')
east = request.GET.get('east')
west = request.GET.get('west')
north = request.GET.get("north")
south = request.GET.get("south")
east = request.GET.get("east")
west = request.GET.get("west")
if all(param is not None for param in [north, south, east, west]):
bounds = GeoBounds(
north=float(north),
south=float(south),
east=float(east),
west=float(west)
west=float(west),
)
# Validate bounds
if not (-90 <= bounds.south <= bounds.north <= 90):
raise ValidationError("Invalid latitude bounds")
if not (-180 <= bounds.west <= bounds.east <= 180):
raise ValidationError("Invalid longitude bounds")
return bounds
return None
except (ValueError, TypeError) as e:
raise ValidationError(f"Invalid bounds parameters: {e}")
def _parse_pagination(self, request: HttpRequest) -> Dict[str, int]:
"""Parse pagination parameters from request."""
try:
page = max(1, int(request.GET.get('page', 1)))
page = max(1, int(request.GET.get("page", 1)))
page_size = min(
self.MAX_PAGE_SIZE,
max(1, int(request.GET.get('page_size', self.DEFAULT_PAGE_SIZE)))
max(
1,
int(request.GET.get("page_size", self.DEFAULT_PAGE_SIZE)),
),
)
offset = (page - 1) * page_size
return {
'page': page,
'page_size': page_size,
'offset': offset,
'limit': page_size
"page": page,
"page_size": page_size,
"offset": offset,
"limit": page_size,
}
except (ValueError, TypeError):
return {
'page': 1,
'page_size': self.DEFAULT_PAGE_SIZE,
'offset': 0,
'limit': self.DEFAULT_PAGE_SIZE
"page": 1,
"page_size": self.DEFAULT_PAGE_SIZE,
"offset": 0,
"limit": self.DEFAULT_PAGE_SIZE,
}
def _parse_filters(self, request: HttpRequest) -> Optional[MapFilters]:
"""Parse filtering parameters from request."""
try:
filters = MapFilters()
# Location types
location_types_param = request.GET.get('types')
location_types_param = request.GET.get("types")
if location_types_param:
type_strings = location_types_param.split(',')
type_strings = location_types_param.split(",")
valid_types = {lt.value for lt in LocationType}
filters.location_types = {
LocationType(t.strip()) for t in type_strings
LocationType(t.strip())
for t in type_strings
if t.strip() in valid_types
}
# Park status
park_status_param = request.GET.get('park_status')
park_status_param = request.GET.get("park_status")
if park_status_param:
filters.park_status = set(park_status_param.split(','))
filters.park_status = set(park_status_param.split(","))
# Ride types
ride_types_param = request.GET.get('ride_types')
ride_types_param = request.GET.get("ride_types")
if ride_types_param:
filters.ride_types = set(ride_types_param.split(','))
filters.ride_types = set(ride_types_param.split(","))
# Company roles
company_roles_param = request.GET.get('company_roles')
company_roles_param = request.GET.get("company_roles")
if company_roles_param:
filters.company_roles = set(company_roles_param.split(','))
filters.company_roles = set(company_roles_param.split(","))
# Search query with length validation
search_query = request.GET.get('q') or request.GET.get('search')
search_query = request.GET.get("q") or request.GET.get("search")
if search_query and len(search_query.strip()) >= 2:
filters.search_query = search_query.strip()
# Rating filter with validation
min_rating_param = request.GET.get('min_rating')
min_rating_param = request.GET.get("min_rating")
if min_rating_param:
min_rating = float(min_rating_param)
if 0 <= min_rating <= 10:
filters.min_rating = min_rating
# Geographic filters with validation
country = request.GET.get('country', '').strip()
country = request.GET.get("country", "").strip()
if country and len(country) >= 2:
filters.country = country
state = request.GET.get('state', '').strip()
state = request.GET.get("state", "").strip()
if state and len(state) >= 2:
filters.state = state
city = request.GET.get('city', '').strip()
city = request.GET.get("city", "").strip()
if city and len(city) >= 2:
filters.city = city
# Coordinates requirement
has_coordinates_param = request.GET.get('has_coordinates')
has_coordinates_param = request.GET.get("has_coordinates")
if has_coordinates_param is not None:
filters.has_coordinates = has_coordinates_param.lower() in ['true', '1', 'yes']
return filters if any([
filters.location_types, filters.park_status, filters.ride_types,
filters.company_roles, filters.search_query, filters.min_rating,
filters.country, filters.state, filters.city
]) else None
filters.has_coordinates = has_coordinates_param.lower() in [
"true",
"1",
"yes",
]
return (
filters
if any(
[
filters.location_types,
filters.park_status,
filters.ride_types,
filters.company_roles,
filters.search_query,
filters.min_rating,
filters.country,
filters.state,
filters.city,
]
)
else None
)
except (ValueError, TypeError) as e:
raise ValidationError(f"Invalid filter parameters: {e}")
def _parse_zoom_level(self, request: HttpRequest) -> int:
"""Parse zoom level from request with default."""
try:
zoom_param = request.GET.get('zoom', '10')
zoom_param = request.GET.get("zoom", "10")
zoom_level = int(zoom_param)
return max(1, min(20, zoom_level)) # Clamp between 1 and 20
except (ValueError, TypeError):
return 10 # Default zoom level
def _create_paginated_response(self, data: list, total_count: int,
pagination: Dict[str, int], request: HttpRequest) -> Dict[str, Any]:
def _create_paginated_response(
self,
data: list,
total_count: int,
pagination: Dict[str, int],
request: HttpRequest,
) -> Dict[str, Any]:
"""Create paginated response with metadata."""
total_pages = (total_count + pagination['page_size'] - 1) // pagination['page_size']
total_pages = (total_count + pagination["page_size"] - 1) // pagination[
"page_size"
]
# Build pagination URLs
base_url = request.build_absolute_uri(request.path)
query_params = request.GET.copy()
next_url = None
if pagination['page'] < total_pages:
query_params['page'] = pagination['page'] + 1
if pagination["page"] < total_pages:
query_params["page"] = pagination["page"] + 1
next_url = f"{base_url}?{query_params.urlencode()}"
prev_url = None
if pagination['page'] > 1:
query_params['page'] = pagination['page'] - 1
if pagination["page"] > 1:
query_params["page"] = pagination["page"] - 1
prev_url = f"{base_url}?{query_params.urlencode()}"
return {
'status': 'success',
'data': data,
'pagination': {
'page': pagination['page'],
'page_size': pagination['page_size'],
'total_pages': total_pages,
'total_count': total_count,
'has_next': pagination['page'] < total_pages,
'has_previous': pagination['page'] > 1,
'next_url': next_url,
'previous_url': prev_url,
}
"status": "success",
"data": data,
"pagination": {
"page": pagination["page"],
"page_size": pagination["page_size"],
"total_pages": total_pages,
"total_count": total_count,
"has_next": pagination["page"] < total_pages,
"has_previous": pagination["page"] > 1,
"next_url": next_url,
"previous_url": prev_url,
},
}
def _error_response(self, message: str, status: int = 400,
error_code: str = None, details: Dict[str, Any] = None) -> JsonResponse:
def _error_response(
self,
message: str,
status: int = 400,
error_code: str = None,
details: Dict[str, Any] = None,
) -> JsonResponse:
"""Return standardized error response with enhanced information."""
response_data = {
'status': 'error',
'message': message,
'timestamp': time.time(),
'data': None
"status": "error",
"message": message,
"timestamp": time.time(),
"data": None,
}
if error_code:
response_data['error_code'] = error_code
response_data["error_code"] = error_code
if details:
response_data['details'] = details
response_data["details"] = details
# Add request ID for debugging in production
if hasattr(settings, 'DEBUG') and not settings.DEBUG:
response_data['request_id'] = getattr(self.request, 'id', None)
if hasattr(settings, "DEBUG") and not settings.DEBUG:
response_data["request_id"] = getattr(self.request, "id", None)
return JsonResponse(response_data, status=status)
def _success_response(self, data: Any, message: str = None,
metadata: Dict[str, Any] = None) -> JsonResponse:
def _success_response(
self, data: Any, message: str = None, metadata: Dict[str, Any] = None
) -> JsonResponse:
"""Return standardized success response."""
response_data = {
'status': 'success',
'data': data,
'timestamp': time.time(),
"status": "success",
"data": data,
"timestamp": time.time(),
}
if message:
response_data['message'] = message
response_data["message"] = message
if metadata:
response_data['metadata'] = metadata
response_data["metadata"] = metadata
return JsonResponse(response_data)
class MapLocationsView(MapAPIView):
"""
API endpoint for getting map locations with optional clustering.
GET /api/map/locations/
Parameters:
- north, south, east, west: Bounding box coordinates
@@ -281,7 +318,7 @@ class MapLocationsView(MapAPIView):
- min_rating: Minimum rating filter
- country, state, city: Geographic filters
"""
@method_decorator(cache_page(300)) # Cache for 5 minutes
@method_decorator(gzip_page) # Compress large responses
def get(self, request: HttpRequest) -> JsonResponse:
@@ -292,57 +329,59 @@ class MapLocationsView(MapAPIView):
filters = self._parse_filters(request)
zoom_level = self._parse_zoom_level(request)
pagination = self._parse_pagination(request)
# Clustering preference
cluster_param = request.GET.get('cluster', 'true')
enable_clustering = cluster_param.lower() in ['true', '1', 'yes']
cluster_param = request.GET.get("cluster", "true")
enable_clustering = cluster_param.lower() in ["true", "1", "yes"]
# Cache preference
use_cache_param = request.GET.get('cache', 'true')
use_cache = use_cache_param.lower() in ['true', '1', 'yes']
use_cache_param = request.GET.get("cache", "true")
use_cache = use_cache_param.lower() in ["true", "1", "yes"]
# Validate request
if not enable_clustering and not bounds and not filters:
return self._error_response(
"Either bounds, filters, or clustering must be specified for non-clustered requests",
error_code="MISSING_PARAMETERS"
error_code="MISSING_PARAMETERS",
)
# Get map data
response = unified_map_service.get_map_data(
bounds=bounds,
filters=filters,
zoom_level=zoom_level,
cluster=enable_clustering,
use_cache=use_cache
use_cache=use_cache,
)
# Handle pagination for non-clustered results
if not enable_clustering and response.locations:
start_idx = pagination['offset']
end_idx = start_idx + pagination['limit']
start_idx = pagination["offset"]
end_idx = start_idx + pagination["limit"]
paginated_locations = response.locations[start_idx:end_idx]
return JsonResponse(self._create_paginated_response(
[loc.to_dict() for loc in paginated_locations],
len(response.locations),
pagination,
request
))
return JsonResponse(
self._create_paginated_response(
[loc.to_dict() for loc in paginated_locations],
len(response.locations),
pagination,
request,
)
)
# For clustered results, return as-is with metadata
response_dict = response.to_dict()
return self._success_response(
response_dict,
metadata={
'clustered': response.clustered,
'cache_hit': response.cache_hit,
'query_time_ms': response.query_time_ms,
'filters_applied': response.filters_applied
}
"clustered": response.clustered,
"cache_hit": response.cache_hit,
"query_time_ms": response.query_time_ms,
"filters_applied": response.filters_applied,
},
)
except ValidationError as e:
logger.warning(f"Validation error in MapLocationsView: {str(e)}")
return self._error_response(str(e), 400, error_code="VALIDATION_ERROR")
@@ -351,72 +390,81 @@ class MapLocationsView(MapAPIView):
return self._error_response(
"Failed to retrieve map locations",
500,
error_code="INTERNAL_ERROR"
error_code="INTERNAL_ERROR",
)
class MapLocationDetailView(MapAPIView):
"""
API endpoint for getting detailed information about a specific location.
GET /api/map/locations/<type>/<id>/
"""
@method_decorator(cache_page(600)) # Cache for 10 minutes
def get(self, request: HttpRequest, location_type: str, location_id: int) -> JsonResponse:
def get(
self, request: HttpRequest, location_type: str, location_id: int
) -> JsonResponse:
"""Get detailed information for a specific location."""
try:
# Validate location type
valid_types = [lt.value for lt in LocationType]
if location_type not in valid_types:
return self._error_response(
f"Invalid location type: {location_type}. Valid types: {', '.join(valid_types)}",
f"Invalid location type: {location_type}. Valid types: {
', '.join(valid_types)}",
400,
error_code="INVALID_LOCATION_TYPE"
error_code="INVALID_LOCATION_TYPE",
)
# Validate location ID
if location_id <= 0:
return self._error_response(
"Location ID must be a positive integer",
400,
error_code="INVALID_LOCATION_ID"
error_code="INVALID_LOCATION_ID",
)
# Get location details
location = unified_map_service.get_location_details(location_type, location_id)
location = unified_map_service.get_location_details(
location_type, location_id
)
if not location:
return self._error_response(
f"Location not found: {location_type}/{location_id}",
404,
error_code="LOCATION_NOT_FOUND"
error_code="LOCATION_NOT_FOUND",
)
return self._success_response(
location.to_dict(),
metadata={
'location_type': location_type,
'location_id': location_id
}
"location_type": location_type,
"location_id": location_id,
},
)
except ValueError as e:
logger.warning(f"Value error in MapLocationDetailView: {str(e)}")
return self._error_response(str(e), 400, error_code="INVALID_PARAMETER")
except Exception as e:
logger.error(f"Error in MapLocationDetailView: {str(e)}", exc_info=True)
logger.error(
f"Error in MapLocationDetailView: {
str(e)}",
exc_info=True,
)
return self._error_response(
"Failed to retrieve location details",
500,
error_code="INTERNAL_ERROR"
error_code="INTERNAL_ERROR",
)
class MapSearchView(MapAPIView):
"""
API endpoint for searching locations by text query.
GET /api/map/search/
Parameters:
- q: Search query (required)
@@ -424,71 +472,75 @@ class MapSearchView(MapAPIView):
- types: Comma-separated location types
- limit: Maximum results (default 50)
"""
@method_decorator(gzip_page) # Compress responses
def get(self, request: HttpRequest) -> JsonResponse:
"""Search locations by text query with pagination."""
try:
# Get and validate search query
query = request.GET.get('q', '').strip()
query = request.GET.get("q", "").strip()
if not query:
return self._error_response(
"Search query 'q' parameter is required",
400,
error_code="MISSING_QUERY"
error_code="MISSING_QUERY",
)
if len(query) < 2:
return self._error_response(
"Search query must be at least 2 characters long",
400,
error_code="QUERY_TOO_SHORT"
error_code="QUERY_TOO_SHORT",
)
# Parse parameters
bounds = self._parse_bounds(request)
pagination = self._parse_pagination(request)
# Parse location types
location_types = None
types_param = request.GET.get('types')
types_param = request.GET.get("types")
if types_param:
try:
valid_types = {lt.value for lt in LocationType}
location_types = {
LocationType(t.strip()) for t in types_param.split(',')
LocationType(t.strip())
for t in types_param.split(",")
if t.strip() in valid_types
}
except ValueError:
return self._error_response(
"Invalid location types",
400,
error_code="INVALID_TYPES"
error_code="INVALID_TYPES",
)
# Set reasonable search limit (higher for search than general listings)
search_limit = min(500, pagination['page'] * pagination['page_size'])
# Set reasonable search limit (higher for search than general
# listings)
search_limit = min(500, pagination["page"] * pagination["page_size"])
# Perform search
locations = unified_map_service.search_locations(
query=query,
bounds=bounds,
location_types=location_types,
limit=search_limit
limit=search_limit,
)
# Apply pagination
start_idx = pagination['offset']
end_idx = start_idx + pagination['limit']
start_idx = pagination["offset"]
end_idx = start_idx + pagination["limit"]
paginated_locations = locations[start_idx:end_idx]
return JsonResponse(self._create_paginated_response(
[loc.to_dict() for loc in paginated_locations],
len(locations),
pagination,
request
))
return JsonResponse(
self._create_paginated_response(
[loc.to_dict() for loc in paginated_locations],
len(locations),
pagination,
request,
)
)
except ValidationError as e:
logger.warning(f"Validation error in MapSearchView: {str(e)}")
return self._error_response(str(e), 400, error_code="VALIDATION_ERROR")
@@ -500,21 +552,21 @@ class MapSearchView(MapAPIView):
return self._error_response(
"Search failed due to internal error",
500,
error_code="SEARCH_FAILED"
error_code="SEARCH_FAILED",
)
class MapBoundsView(MapAPIView):
"""
API endpoint for getting locations within specific bounds.
GET /api/map/bounds/
Parameters:
- north, south, east, west: Bounding box coordinates (required)
- types: Comma-separated location types
- zoom: Zoom level
"""
@method_decorator(cache_page(300)) # Cache for 5 minutes
def get(self, request: HttpRequest) -> JsonResponse:
"""Get locations within specific geographic bounds."""
@@ -525,18 +577,19 @@ class MapBoundsView(MapAPIView):
return self._error_response(
"Bounds parameters required: north, south, east, west", 400
)
# Parse optional filters
location_types = None
types_param = request.GET.get('types')
types_param = request.GET.get("types")
if types_param:
location_types = {
LocationType(t.strip()) for t in types_param.split(',')
LocationType(t.strip())
for t in types_param.split(",")
if t.strip() in [lt.value for lt in LocationType]
}
zoom_level = self._parse_zoom_level(request)
# Get locations within bounds
response = unified_map_service.get_locations_by_bounds(
north=bounds.north,
@@ -544,86 +597,103 @@ class MapBoundsView(MapAPIView):
east=bounds.east,
west=bounds.west,
location_types=location_types,
zoom_level=zoom_level
zoom_level=zoom_level,
)
return JsonResponse(response.to_dict())
except ValidationError as e:
return self._error_response(str(e), 400)
except Exception as e:
return self._error_response(f"Internal server error: {str(e)}", 500)
return self._error_response(
f"Internal server error: {
str(e)}",
500,
)
class MapStatsView(MapAPIView):
"""
API endpoint for getting map service statistics and health information.
GET /api/map/stats/
"""
def get(self, request: HttpRequest) -> JsonResponse:
"""Get map service statistics and performance metrics."""
try:
stats = unified_map_service.get_service_stats()
return JsonResponse({
'status': 'success',
'data': stats
})
return JsonResponse({"status": "success", "data": stats})
except Exception as e:
return self._error_response(f"Internal server error: {str(e)}", 500)
return self._error_response(
f"Internal server error: {
str(e)}",
500,
)
class MapCacheView(MapAPIView):
"""
API endpoint for cache management (admin only).
DELETE /api/map/cache/
POST /api/map/cache/invalidate/
"""
def delete(self, request: HttpRequest) -> JsonResponse:
"""Clear all map cache (admin only)."""
# TODO: Add admin permission check
try:
unified_map_service.invalidate_cache()
return JsonResponse({
'status': 'success',
'message': 'Map cache cleared successfully'
})
return JsonResponse(
{
"status": "success",
"message": "Map cache cleared successfully",
}
)
except Exception as e:
return self._error_response(f"Internal server error: {str(e)}", 500)
return self._error_response(
f"Internal server error: {
str(e)}",
500,
)
def post(self, request: HttpRequest) -> JsonResponse:
"""Invalidate specific cache entries."""
# TODO: Add admin permission check
try:
data = json.loads(request.body)
location_type = data.get('location_type')
location_id = data.get('location_id')
bounds_data = data.get('bounds')
location_type = data.get("location_type")
location_id = data.get("location_id")
bounds_data = data.get("bounds")
bounds = None
if bounds_data:
bounds = GeoBounds(**bounds_data)
unified_map_service.invalidate_cache(
location_type=location_type,
location_id=location_id,
bounds=bounds
bounds=bounds,
)
return JsonResponse({
'status': 'success',
'message': 'Cache invalidated successfully'
})
return JsonResponse(
{
"status": "success",
"message": "Cache invalidated successfully",
}
)
except (json.JSONDecodeError, TypeError, ValueError) as e:
return self._error_response(f"Invalid request data: {str(e)}", 400)
except Exception as e:
return self._error_response(f"Internal server error: {str(e)}", 500)
return self._error_response(
f"Internal server error: {
str(e)}",
500,
)

View File

@@ -5,15 +5,10 @@ Provides web interfaces for map functionality with HTMX integration.
import json
from typing import Dict, Any, Optional, Set
from django.shortcuts import render, get_object_or_404
from django.shortcuts import render
from django.http import JsonResponse, HttpRequest, HttpResponse
from django.views.generic import TemplateView, View
from django.views.decorators.http import require_http_methods
from django.utils.decorators import method_decorator
from django.contrib.auth.mixins import LoginRequiredMixin
from django.core.paginator import Paginator
from django.core.exceptions import ValidationError
from django.db.models import Q
from ..services.map_service import unified_map_service
from ..services.data_structures import GeoBounds, MapFilters, LocationType
@@ -21,29 +16,30 @@ from ..services.data_structures import GeoBounds, MapFilters, LocationType
class MapViewMixin:
"""Mixin providing common functionality for map views."""
def get_map_context(self, request: HttpRequest) -> Dict[str, Any]:
"""Get common context data for map views."""
return {
'map_api_urls': {
'locations': '/api/map/locations/',
'search': '/api/map/search/',
'bounds': '/api/map/bounds/',
'location_detail': '/api/map/locations/',
"map_api_urls": {
"locations": "/api/map/locations/",
"search": "/api/map/search/",
"bounds": "/api/map/bounds/",
"location_detail": "/api/map/locations/",
},
'location_types': [lt.value for lt in LocationType],
'default_zoom': 10,
'enable_clustering': True,
'enable_search': True,
"location_types": [lt.value for lt in LocationType],
"default_zoom": 10,
"enable_clustering": True,
"enable_search": True,
}
def parse_location_types(self, request: HttpRequest) -> Optional[Set[LocationType]]:
"""Parse location types from request parameters."""
types_param = request.GET.get('types')
types_param = request.GET.get("types")
if types_param:
try:
return {
LocationType(t.strip()) for t in types_param.split(',')
LocationType(t.strip())
for t in types_param.split(",")
if t.strip() in [lt.value for lt in LocationType]
}
except ValueError:
@@ -54,122 +50,141 @@ class MapViewMixin:
class UniversalMapView(MapViewMixin, TemplateView):
"""
Main universal map view showing all location types.
URL: /maps/
"""
template_name = 'maps/universal_map.html'
template_name = "maps/universal_map.html"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context.update(self.get_map_context(self.request))
# Additional context for universal map
context.update({
'page_title': 'Interactive Map - All Locations',
'map_type': 'universal',
'show_all_types': True,
'initial_location_types': [lt.value for lt in LocationType],
'filters_enabled': True,
})
context.update(
{
"page_title": "Interactive Map - All Locations",
"map_type": "universal",
"show_all_types": True,
"initial_location_types": [lt.value for lt in LocationType],
"filters_enabled": True,
}
)
# Handle initial bounds from query parameters
if all(param in self.request.GET for param in ['north', 'south', 'east', 'west']):
if all(
param in self.request.GET for param in ["north", "south", "east", "west"]
):
try:
context['initial_bounds'] = {
'north': float(self.request.GET['north']),
'south': float(self.request.GET['south']),
'east': float(self.request.GET['east']),
'west': float(self.request.GET['west']),
context["initial_bounds"] = {
"north": float(self.request.GET["north"]),
"south": float(self.request.GET["south"]),
"east": float(self.request.GET["east"]),
"west": float(self.request.GET["west"]),
}
except (ValueError, TypeError):
pass
return context
class ParkMapView(MapViewMixin, TemplateView):
"""
Map view focused specifically on parks.
URL: /maps/parks/
"""
template_name = 'maps/park_map.html'
template_name = "maps/park_map.html"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context.update(self.get_map_context(self.request))
# Park-specific context
context.update({
'page_title': 'Theme Parks Map',
'map_type': 'parks',
'show_all_types': False,
'initial_location_types': [LocationType.PARK.value],
'filters_enabled': True,
'park_specific_filters': True,
})
context.update(
{
"page_title": "Theme Parks Map",
"map_type": "parks",
"show_all_types": False,
"initial_location_types": [LocationType.PARK.value],
"filters_enabled": True,
"park_specific_filters": True,
}
)
return context
class NearbyLocationsView(MapViewMixin, TemplateView):
"""
View for showing locations near a specific point.
URL: /maps/nearby/
"""
template_name = 'maps/nearby_locations.html'
template_name = "maps/nearby_locations.html"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context.update(self.get_map_context(self.request))
# Parse coordinates from query parameters
lat = self.request.GET.get('lat')
lng = self.request.GET.get('lng')
radius = self.request.GET.get('radius', '50') # Default 50km radius
lat = self.request.GET.get("lat")
lng = self.request.GET.get("lng")
radius = self.request.GET.get("radius", "50") # Default 50km radius
if lat and lng:
try:
center_lat = float(lat)
center_lng = float(lng)
search_radius = min(200, max(1, float(radius))) # Clamp between 1-200km
context.update({
'page_title': f'Locations Near {center_lat:.4f}, {center_lng:.4f}',
'map_type': 'nearby',
'center_coordinates': {'lat': center_lat, 'lng': center_lng},
'search_radius': search_radius,
'show_radius_circle': True,
})
# Clamp between 1-200km
search_radius = min(200, max(1, float(radius)))
context.update(
{
"page_title": f"Locations Near {
center_lat:.4f}, {
center_lng:.4f}",
"map_type": "nearby",
"center_coordinates": {
"lat": center_lat,
"lng": center_lng,
},
"search_radius": search_radius,
"show_radius_circle": True,
}
)
except (ValueError, TypeError):
context['error'] = 'Invalid coordinates provided'
context["error"] = "Invalid coordinates provided"
else:
context.update({
'page_title': 'Nearby Locations',
'map_type': 'nearby',
'prompt_for_location': True,
})
context.update(
{
"page_title": "Nearby Locations",
"map_type": "nearby",
"prompt_for_location": True,
}
)
return context
class LocationFilterView(MapViewMixin, View):
"""
HTMX endpoint for updating map when filters change.
URL: /maps/htmx/filter/
"""
def get(self, request: HttpRequest) -> HttpResponse:
"""Return filtered location data for HTMX updates."""
try:
# Parse filter parameters
location_types = self.parse_location_types(request)
search_query = request.GET.get('q', '').strip()
country = request.GET.get('country', '').strip()
state = request.GET.get('state', '').strip()
search_query = request.GET.get("q", "").strip()
country = request.GET.get("country", "").strip()
state = request.GET.get("state", "").strip()
# Create filters
filters = None
if any([location_types, search_query, country, state]):
@@ -178,108 +193,107 @@ class LocationFilterView(MapViewMixin, View):
search_query=search_query or None,
country=country or None,
state=state or None,
has_coordinates=True
has_coordinates=True,
)
# Get filtered locations
map_response = unified_map_service.get_map_data(
filters=filters,
zoom_level=int(request.GET.get('zoom', '10')),
cluster=request.GET.get('cluster', 'true').lower() == 'true'
zoom_level=int(request.GET.get("zoom", "10")),
cluster=request.GET.get("cluster", "true").lower() == "true",
)
# Return JSON response for HTMX
return JsonResponse({
'status': 'success',
'data': map_response.to_dict(),
'filters_applied': map_response.filters_applied
})
return JsonResponse(
{
"status": "success",
"data": map_response.to_dict(),
"filters_applied": map_response.filters_applied,
}
)
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
}, status=400)
return JsonResponse({"status": "error", "message": str(e)}, status=400)
class LocationSearchView(MapViewMixin, View):
"""
HTMX endpoint for real-time location search.
URL: /maps/htmx/search/
"""
def get(self, request: HttpRequest) -> HttpResponse:
"""Return search results for HTMX updates."""
query = request.GET.get('q', '').strip()
query = request.GET.get("q", "").strip()
if not query or len(query) < 3:
return render(request, 'maps/partials/search_results.html', {
'results': [],
'query': query,
'message': 'Enter at least 3 characters to search'
})
return render(
request,
"maps/partials/search_results.html",
{
"results": [],
"query": query,
"message": "Enter at least 3 characters to search",
},
)
try:
# Parse optional location types
location_types = self.parse_location_types(request)
limit = min(20, max(5, int(request.GET.get('limit', '10'))))
limit = min(20, max(5, int(request.GET.get("limit", "10"))))
# Perform search
results = unified_map_service.search_locations(
query=query,
location_types=location_types,
limit=limit
query=query, location_types=location_types, limit=limit
)
return render(request, 'maps/partials/search_results.html', {
'results': results,
'query': query,
'count': len(results)
})
return render(
request,
"maps/partials/search_results.html",
{"results": results, "query": query, "count": len(results)},
)
except Exception as e:
return render(request, 'maps/partials/search_results.html', {
'results': [],
'query': query,
'error': str(e)
})
return render(
request,
"maps/partials/search_results.html",
{"results": [], "query": query, "error": str(e)},
)
class MapBoundsUpdateView(MapViewMixin, View):
"""
HTMX endpoint for updating locations when map bounds change.
URL: /maps/htmx/bounds/
"""
def post(self, request: HttpRequest) -> HttpResponse:
"""Update map data when bounds change."""
try:
data = json.loads(request.body)
# Parse bounds
bounds = GeoBounds(
north=float(data['north']),
south=float(data['south']),
east=float(data['east']),
west=float(data['west'])
north=float(data["north"]),
south=float(data["south"]),
east=float(data["east"]),
west=float(data["west"]),
)
# Parse additional parameters
zoom_level = int(data.get('zoom', 10))
zoom_level = int(data.get("zoom", 10))
location_types = None
if 'types' in data:
if "types" in data:
location_types = {
LocationType(t) for t in data['types']
LocationType(t)
for t in data["types"]
if t in [lt.value for lt in LocationType]
}
# Create filters if needed
filters = None
if location_types:
filters = MapFilters(location_types=location_types)
# Location types are used directly in the service call
# Get updated map data
map_response = unified_map_service.get_locations_by_bounds(
north=bounds.north,
@@ -287,79 +301,86 @@ class MapBoundsUpdateView(MapViewMixin, View):
east=bounds.east,
west=bounds.west,
location_types=location_types,
zoom_level=zoom_level
zoom_level=zoom_level,
)
return JsonResponse({
'status': 'success',
'data': map_response.to_dict()
})
return JsonResponse({"status": "success", "data": map_response.to_dict()})
except (json.JSONDecodeError, ValueError, KeyError) as e:
return JsonResponse({
'status': 'error',
'message': f'Invalid request data: {str(e)}'
}, status=400)
return JsonResponse(
{
"status": "error",
"message": f"Invalid request data: {str(e)}",
},
status=400,
)
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
}, status=500)
return JsonResponse({"status": "error", "message": str(e)}, status=500)
class LocationDetailModalView(MapViewMixin, View):
"""
HTMX endpoint for showing location details in modal.
URL: /maps/htmx/location/<type>/<id>/
"""
def get(self, request: HttpRequest, location_type: str, location_id: int) -> HttpResponse:
def get(
self, request: HttpRequest, location_type: str, location_id: int
) -> HttpResponse:
"""Return location detail modal content."""
try:
# Validate location type
if location_type not in [lt.value for lt in LocationType]:
return render(request, 'maps/partials/location_modal.html', {
'error': f'Invalid location type: {location_type}'
})
return render(
request,
"maps/partials/location_modal.html",
{"error": f"Invalid location type: {location_type}"},
)
# Get location details
location = unified_map_service.get_location_details(location_type, location_id)
location = unified_map_service.get_location_details(
location_type, location_id
)
if not location:
return render(request, 'maps/partials/location_modal.html', {
'error': 'Location not found'
})
return render(request, 'maps/partials/location_modal.html', {
'location': location,
'location_type': location_type
})
return render(
request,
"maps/partials/location_modal.html",
{"error": "Location not found"},
)
return render(
request,
"maps/partials/location_modal.html",
{"location": location, "location_type": location_type},
)
except Exception as e:
return render(request, 'maps/partials/location_modal.html', {
'error': str(e)
})
return render(
request, "maps/partials/location_modal.html", {"error": str(e)}
)
class LocationListView(MapViewMixin, TemplateView):
"""
View for listing locations with pagination (non-map view).
URL: /maps/list/
"""
template_name = 'maps/location_list.html'
template_name = "maps/location_list.html"
paginate_by = 20
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# Parse filters
location_types = self.parse_location_types(self.request)
search_query = self.request.GET.get('q', '').strip()
country = self.request.GET.get('country', '').strip()
state = self.request.GET.get('state', '').strip()
search_query = self.request.GET.get("q", "").strip()
country = self.request.GET.get("country", "").strip()
state = self.request.GET.get("state", "").strip()
# Create filters
filters = None
if any([location_types, search_query, country, state]):
@@ -368,33 +389,33 @@ class LocationListView(MapViewMixin, TemplateView):
search_query=search_query or None,
country=country or None,
state=state or None,
has_coordinates=True
has_coordinates=True,
)
# Get locations without clustering
map_response = unified_map_service.get_map_data(
filters=filters,
cluster=False,
use_cache=True
filters=filters, cluster=False, use_cache=True
)
# Paginate results
paginator = Paginator(map_response.locations, self.paginate_by)
page_number = self.request.GET.get('page')
page_number = self.request.GET.get("page")
page_obj = paginator.get_page(page_number)
context.update({
'page_title': 'All Locations',
'locations': page_obj,
'total_count': map_response.total_count,
'applied_filters': filters,
'location_types': [lt.value for lt in LocationType],
'current_filters': {
'types': self.request.GET.getlist('types'),
'q': search_query,
'country': country,
'state': state,
context.update(
{
"page_title": "All Locations",
"locations": page_obj,
"total_count": map_response.total_count,
"applied_filters": filters,
"location_types": [lt.value for lt in LocationType],
"current_filters": {
"types": self.request.GET.getlist("types"),
"q": search_query,
"country": country,
"state": state,
},
}
})
return context
)
return context

View File

@@ -1,23 +1,27 @@
from django.views.generic import TemplateView
from django.http import JsonResponse
from django.contrib.gis.geos import Point
from django.contrib.gis.measure import Distance
from parks.models import Park
from parks.filters import ParkFilter
from core.services.location_search import location_search_service, LocationSearchFilters
from core.services.location_search import (
location_search_service,
LocationSearchFilters,
)
from core.forms.search import LocationSearchForm
class AdaptiveSearchView(TemplateView):
template_name = "core/search/results.html"
def get_queryset(self):
"""
Get the base queryset, optimized with select_related and prefetch_related
"""
return Park.objects.select_related('operator', 'property_owner').prefetch_related(
'location',
'photos'
).all()
return (
Park.objects.select_related("operator", "property_owner")
.prefetch_related("location", "photos")
.all()
)
def get_filterset(self):
"""
@@ -31,32 +35,38 @@ class AdaptiveSearchView(TemplateView):
"""
context = super().get_context_data(**kwargs)
filterset = self.get_filterset()
# Check if location-based search is being used
location_search = self.request.GET.get('location_search', '').strip()
near_location = self.request.GET.get('near_location', '').strip()
location_search = self.request.GET.get("location_search", "").strip()
near_location = self.request.GET.get("near_location", "").strip()
# Add location search context
context.update({
'results': filterset.qs,
'filters': filterset,
'applied_filters': bool(self.request.GET), # Check if any filters are applied
'is_location_search': bool(location_search or near_location),
'location_search_query': location_search or near_location,
})
context.update(
{
"results": filterset.qs,
"filters": filterset,
"applied_filters": bool(
self.request.GET
), # Check if any filters are applied
"is_location_search": bool(location_search or near_location),
"location_search_query": location_search or near_location,
}
)
return context
class FilterFormView(TemplateView):
"""
View for rendering just the filter form for HTMX updates
"""
template_name = "core/search/filters.html"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
filterset = ParkFilter(self.request.GET, queryset=Park.objects.all())
context['filters'] = filterset
context["filters"] = filterset
return context
@@ -64,84 +74,88 @@ class LocationSearchView(TemplateView):
"""
Enhanced search view with comprehensive location search capabilities.
"""
template_name = "core/search/location_results.html"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# Build search filters from request parameters
filters = self._build_search_filters()
# Perform search
results = location_search_service.search(filters)
# Group results by type for better presentation
grouped_results = {
'parks': [r for r in results if r.content_type == 'park'],
'rides': [r for r in results if r.content_type == 'ride'],
'companies': [r for r in results if r.content_type == 'company'],
"parks": [r for r in results if r.content_type == "park"],
"rides": [r for r in results if r.content_type == "ride"],
"companies": [r for r in results if r.content_type == "company"],
}
context.update({
'results': results,
'grouped_results': grouped_results,
'total_results': len(results),
'search_filters': filters,
'has_location_filter': bool(filters.location_point),
'search_form': LocationSearchForm(self.request.GET),
})
context.update(
{
"results": results,
"grouped_results": grouped_results,
"total_results": len(results),
"search_filters": filters,
"has_location_filter": bool(filters.location_point),
"search_form": LocationSearchForm(self.request.GET),
}
)
return context
def _build_search_filters(self) -> LocationSearchFilters:
"""Build LocationSearchFilters from request parameters."""
form = LocationSearchForm(self.request.GET)
form.is_valid() # Populate cleaned_data
# Parse location coordinates if provided
location_point = None
lat = form.cleaned_data.get('lat')
lng = form.cleaned_data.get('lng')
lat = form.cleaned_data.get("lat")
lng = form.cleaned_data.get("lng")
if lat and lng:
try:
location_point = Point(float(lng), float(lat), srid=4326)
except (ValueError, TypeError):
location_point = None
# Parse location types
location_types = set()
if form.cleaned_data.get('search_parks'):
location_types.add('park')
if form.cleaned_data.get('search_rides'):
location_types.add('ride')
if form.cleaned_data.get('search_companies'):
location_types.add('company')
if form.cleaned_data.get("search_parks"):
location_types.add("park")
if form.cleaned_data.get("search_rides"):
location_types.add("ride")
if form.cleaned_data.get("search_companies"):
location_types.add("company")
# If no specific types selected, search all
if not location_types:
location_types = {'park', 'ride', 'company'}
location_types = {"park", "ride", "company"}
# Parse radius
radius_km = None
radius_str = form.cleaned_data.get('radius_km', '').strip()
radius_str = form.cleaned_data.get("radius_km", "").strip()
if radius_str:
try:
radius_km = float(radius_str)
radius_km = max(1, min(500, radius_km)) # Clamp between 1-500km
# Clamp between 1-500km
radius_km = max(1, min(500, radius_km))
except (ValueError, TypeError):
radius_km = None
return LocationSearchFilters(
search_query=form.cleaned_data.get('q', '').strip() or None,
search_query=form.cleaned_data.get("q", "").strip() or None,
location_point=location_point,
radius_km=radius_km,
location_types=location_types if location_types else None,
country=form.cleaned_data.get('country', '').strip() or None,
state=form.cleaned_data.get('state', '').strip() or None,
city=form.cleaned_data.get('city', '').strip() or None,
park_status=self.request.GET.getlist('park_status') or None,
country=form.cleaned_data.get("country", "").strip() or None,
state=form.cleaned_data.get("state", "").strip() or None,
city=form.cleaned_data.get("city", "").strip() or None,
park_status=self.request.GET.getlist("park_status") or None,
include_distance=True,
max_results=int(self.request.GET.get('limit', 100))
max_results=int(self.request.GET.get("limit", 100)),
)
@@ -149,16 +163,16 @@ class LocationSuggestionsView(TemplateView):
"""
AJAX endpoint for location search suggestions.
"""
def get(self, request, *args, **kwargs):
query = request.GET.get('q', '').strip()
limit = int(request.GET.get('limit', 10))
query = request.GET.get("q", "").strip()
limit = int(request.GET.get("limit", 10))
if len(query) < 2:
return JsonResponse({'suggestions': []})
return JsonResponse({"suggestions": []})
try:
suggestions = location_search_service.suggest_locations(query, limit)
return JsonResponse({'suggestions': suggestions})
return JsonResponse({"suggestions": suggestions})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
return JsonResponse({"error": str(e)}, status=500)

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional, Type, cast
from typing import Any, Dict, Optional, Type
from django.shortcuts import redirect
from django.urls import reverse
from django.views.generic import DetailView
@@ -6,13 +6,15 @@ from django.views import View
from django.http import HttpRequest, HttpResponse
from django.db.models import Model
class SlugRedirectMixin(View):
"""
Mixin that handles redirects for old slugs.
Requires the model to inherit from SluggedModel and view to inherit from DetailView.
"""
model: Optional[Type[Model]] = None
slug_url_kwarg: str = 'slug'
slug_url_kwarg: str = "slug"
object: Optional[Model] = None
def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
@@ -25,19 +27,18 @@ class SlugRedirectMixin(View):
self.object = self.get_object() # type: ignore
# Check if we used an old slug
current_slug = kwargs.get(self.slug_url_kwarg)
if current_slug and current_slug != getattr(self.object, 'slug', None):
if current_slug and current_slug != getattr(self.object, "slug", None):
# Get the URL pattern name from the view
url_pattern = self.get_redirect_url_pattern()
# Build kwargs for reverse()
reverse_kwargs = self.get_redirect_url_kwargs()
# Redirect to the current slug URL
return redirect(
reverse(url_pattern, kwargs=reverse_kwargs),
permanent=True
reverse(url_pattern, kwargs=reverse_kwargs), permanent=True
)
return super().dispatch(request, *args, **kwargs)
except (AttributeError, Exception) as e: # type: ignore
if self.model and hasattr(self.model, 'DoesNotExist'):
if self.model and hasattr(self.model, "DoesNotExist"):
if isinstance(e, self.model.DoesNotExist): # type: ignore
return super().dispatch(request, *args, **kwargs)
return super().dispatch(request, *args, **kwargs)
@@ -58,4 +59,4 @@ class SlugRedirectMixin(View):
"""
if not self.object:
return {}
return {self.slug_url_kwarg: getattr(self.object, 'slug', '')}
return {self.slug_url_kwarg: getattr(self.object, "slug", "")}