Revert "update"

This reverts commit 75cc618c2b.
This commit is contained in:
pacnpal
2025-09-21 20:11:00 -04:00
parent 75cc618c2b
commit 540f40e689
610 changed files with 4812 additions and 1715 deletions

View File

@@ -0,0 +1,13 @@
from .roadtrip import RoadTripService
from .park_management import ParkService
from .location_service import ParkLocationService
from .filter_service import ParkFilterService
from .media_service import ParkMediaService
__all__ = [
"RoadTripService",
"ParkService",
"ParkLocationService",
"ParkFilterService",
"ParkMediaService",
]

View File

@@ -0,0 +1,304 @@
"""
Park Filter Service
Provides filtering functionality, aggregations, and caching for park filters.
This service handles complex filter logic and provides useful filter statistics.
"""
from typing import Dict, List, Any, Optional
from django.db.models import QuerySet, Count, Q
from django.core.cache import cache
from django.conf import settings
from ..models import Park, Company
from ..querysets import get_base_park_queryset
class ParkFilterService:
"""
Service class for handling park filtering operations, aggregations,
and providing filter suggestions based on available data.
"""
CACHE_TIMEOUT = getattr(settings, "PARK_FILTER_CACHE_TIMEOUT", 300) # 5 minutes
def __init__(self):
self.cache_prefix = "park_filter"
def get_filter_counts(
self, base_queryset: Optional[QuerySet] = None
) -> Dict[str, Any]:
"""
Get counts for various filter options to show users what's available.
Args:
base_queryset: Optional base queryset to use for calculations
Returns:
Dictionary containing counts for different filter categories
"""
cache_key = f"{self.cache_prefix}:filter_counts"
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
if base_queryset is None:
base_queryset = get_base_park_queryset()
# Calculate filter counts
filter_counts = {
"total_parks": base_queryset.count(),
"operating_parks": base_queryset.filter(status="OPERATING").count(),
"parks_with_coasters": base_queryset.filter(coaster_count__gt=0).count(),
"big_parks": base_queryset.filter(ride_count__gte=10).count(),
"highly_rated": base_queryset.filter(average_rating__gte=4.0).count(),
"park_types": self._get_park_type_counts(base_queryset),
"top_operators": self._get_top_operators(base_queryset),
"countries": self._get_country_counts(base_queryset),
}
# Cache the result
cache.set(cache_key, filter_counts, self.CACHE_TIMEOUT)
return filter_counts
def _get_park_type_counts(self, queryset: QuerySet) -> Dict[str, int]:
"""Get counts for different park types based on operator names."""
return {
"disney": queryset.filter(operator__name__icontains="Disney").count(),
"universal": queryset.filter(operator__name__icontains="Universal").count(),
"six_flags": queryset.filter(operator__name__icontains="Six Flags").count(),
"cedar_fair": queryset.filter(
Q(operator__name__icontains="Cedar Fair")
| Q(operator__name__icontains="Cedar Point")
| Q(operator__name__icontains="Kings Island")
).count(),
}
def _get_top_operators(
self, queryset: QuerySet, limit: int = 10
) -> List[Dict[str, Any]]:
"""Get the top operators by number of parks."""
return list(
queryset.values("operator__name", "operator__id")
.annotate(park_count=Count("id"))
.filter(park_count__gt=0)
.order_by("-park_count")[:limit]
)
def _get_country_counts(
self, queryset: QuerySet, limit: int = 10
) -> List[Dict[str, Any]]:
"""Get countries with the most parks."""
return list(
queryset.filter(location__country__isnull=False)
.values("location__country")
.annotate(park_count=Count("id"))
.filter(park_count__gt=0)
.order_by("-park_count")[:limit]
)
def get_filter_suggestions(self, query: str) -> Dict[str, List[str]]:
"""
Get filter suggestions based on a search query.
Args:
query: Search query string
Returns:
Dictionary with suggestion categories
"""
cache_key = f"{self.cache_prefix}:suggestions:{query.lower()}"
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
suggestions = {
"parks": [],
"operators": [],
"locations": [],
}
if len(query) >= 2: # Only search for queries of 2+ characters
# Park name suggestions
park_names = Park.objects.filter(name__icontains=query).values_list(
"name", flat=True
)[:5]
suggestions["parks"] = list(park_names)
# Operator suggestions
operator_names = Company.objects.filter(
roles__contains=["OPERATOR"], name__icontains=query
).values_list("name", flat=True)[:5]
suggestions["operators"] = list(operator_names)
# Location suggestions (cities and countries)
locations = Park.objects.filter(
Q(location__city__icontains=query)
| Q(location__country__icontains=query)
).values_list("location__city", "location__country")[:5]
location_suggestions = []
for city, country in locations:
if city and city.lower().startswith(query.lower()):
location_suggestions.append(city)
elif country and country.lower().startswith(query.lower()):
location_suggestions.append(country)
suggestions["locations"] = list(set(location_suggestions))[:5]
# Cache suggestions for a shorter time
cache.set(cache_key, suggestions, 60) # 1 minute cache
return suggestions
def get_popular_filters(self) -> Dict[str, Any]:
"""
Get commonly used filter combinations and popular filter values.
Returns:
Dictionary containing popular filter configurations
"""
cache_key = f"{self.cache_prefix}:popular_filters"
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
base_qs = get_base_park_queryset()
popular_filters = {
"quick_filters": [
{
"label": "Disney Parks",
"filters": {"park_type": "disney"},
"count": base_qs.filter(operator__name__icontains="Disney").count(),
},
{
"label": "Parks with Coasters",
"filters": {"has_coasters": True},
"count": base_qs.filter(coaster_count__gt=0).count(),
},
{
"label": "Highly Rated",
"filters": {"min_rating": "4"},
"count": base_qs.filter(average_rating__gte=4.0).count(),
},
{
"label": "Major Parks",
"filters": {"big_parks_only": True},
"count": base_qs.filter(ride_count__gte=10).count(),
},
],
"recommended_sorts": [
{"value": "-average_rating", "label": "Highest Rated"},
{"value": "-coaster_count", "label": "Most Coasters"},
{"value": "name", "label": "A-Z"},
],
}
# Cache for longer since these don't change often
cache.set(cache_key, popular_filters, self.CACHE_TIMEOUT * 2)
return popular_filters
def clear_filter_cache(self) -> None:
"""Clear all cached filter data."""
# Simple cache clearing - delete known keys
cache_keys = [
f"{self.cache_prefix}:filter_counts",
f"{self.cache_prefix}:popular_filters",
]
for key in cache_keys:
cache.delete(key)
def get_filtered_queryset(self, filters: Dict[str, Any]) -> QuerySet: # noqa: C901
"""
Apply filters to get a filtered queryset with optimizations.
Args:
filters: Dictionary of filter parameters
Returns:
Filtered and optimized QuerySet
"""
queryset = (
get_base_park_queryset()
.select_related("operator", "property_owner", "location")
.prefetch_related("photos", "rides__manufacturer")
)
# Apply status filter
if filters.get("status"):
queryset = queryset.filter(status=filters["status"])
# Apply park type filter
if filters.get("park_type"):
queryset = self._apply_park_type_filter(queryset, filters["park_type"])
# Apply coaster filter
if filters.get("has_coasters"):
queryset = queryset.filter(coaster_count__gt=0)
# Apply rating filter
if filters.get("min_rating"):
try:
min_rating = float(filters["min_rating"])
queryset = queryset.filter(average_rating__gte=min_rating)
except (ValueError, TypeError):
pass
# Apply big parks filter
if filters.get("big_parks_only"):
queryset = queryset.filter(ride_count__gte=10)
# Apply search
if filters.get("search"):
search_query = filters["search"]
queryset = queryset.filter(
Q(name__icontains=search_query)
| Q(description__icontains=search_query)
| Q(location__city__icontains=search_query)
| Q(location__country__icontains=search_query)
)
# Apply location filters
if filters.get("country_filter"):
queryset = queryset.filter(
location__country__icontains=filters["country_filter"]
)
if filters.get("state_filter"):
queryset = queryset.filter(
location__state__icontains=filters["state_filter"]
)
# Apply ordering
if filters.get("ordering"):
queryset = queryset.order_by(filters["ordering"])
return queryset.distinct()
def _apply_park_type_filter(self, queryset: QuerySet, park_type: str) -> QuerySet:
"""Apply park type filter logic."""
type_filters = {
"disney": Q(operator__name__icontains="Disney"),
"universal": Q(operator__name__icontains="Universal"),
"six_flags": Q(operator__name__icontains="Six Flags"),
"cedar_fair": (
Q(operator__name__icontains="Cedar Fair")
| Q(operator__name__icontains="Cedar Point")
| Q(operator__name__icontains="Kings Island")
| Q(operator__name__icontains="Canada's Wonderland")
),
"independent": ~(
Q(operator__name__icontains="Disney")
| Q(operator__name__icontains="Universal")
| Q(operator__name__icontains="Six Flags")
| Q(operator__name__icontains="Cedar Fair")
| Q(operator__name__icontains="Cedar Point")
),
}
if park_type in type_filters:
return queryset.filter(type_filters[park_type])
return queryset

View File

@@ -0,0 +1,428 @@
"""
Smart Park Loader for Hybrid Filtering Strategy
This module provides intelligent data loading capabilities for the hybrid filtering approach,
optimizing database queries and implementing progressive loading strategies.
"""
from typing import Dict, Optional, Any
from django.db import models
from django.core.cache import cache
from django.conf import settings
from apps.parks.models import Park
class SmartParkLoader:
"""
Intelligent park data loader that optimizes queries based on filtering requirements.
Implements progressive loading and smart caching strategies.
"""
# Cache configuration
CACHE_TIMEOUT = getattr(settings, 'HYBRID_FILTER_CACHE_TIMEOUT', 300) # 5 minutes
CACHE_KEY_PREFIX = 'hybrid_parks'
# Progressive loading thresholds
INITIAL_LOAD_SIZE = 50
PROGRESSIVE_LOAD_SIZE = 25
MAX_CLIENT_SIDE_RECORDS = 200
def __init__(self):
self.base_queryset = self._get_optimized_queryset()
def _get_optimized_queryset(self) -> models.QuerySet:
"""Get optimized base queryset with all necessary prefetches."""
return Park.objects.select_related(
'operator',
'property_owner',
'banner_image',
'card_image',
).prefetch_related(
'location', # ParkLocation relationship
).filter(
# Only include operating and temporarily closed parks by default
status__in=['OPERATING', 'CLOSED_TEMP']
).order_by('name')
def get_initial_load(self, filters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Get initial park data load with smart filtering decisions.
Args:
filters: Optional filters to apply
Returns:
Dictionary containing parks data and metadata
"""
cache_key = self._generate_cache_key('initial', filters)
cached_result = cache.get(cache_key)
if cached_result:
return cached_result
# Apply filters if provided
queryset = self.base_queryset
if filters:
queryset = self._apply_filters(queryset, filters)
# Get total count for pagination decisions
total_count = queryset.count()
# Determine loading strategy
if total_count <= self.MAX_CLIENT_SIDE_RECORDS:
# Load all data for client-side filtering
parks = list(queryset.all())
strategy = 'client_side'
has_more = False
else:
# Load initial batch for server-side pagination
parks = list(queryset[:self.INITIAL_LOAD_SIZE])
strategy = 'server_side'
has_more = total_count > self.INITIAL_LOAD_SIZE
result = {
'parks': parks,
'total_count': total_count,
'strategy': strategy,
'has_more': has_more,
'next_offset': len(parks) if has_more else None,
'filter_metadata': self._get_filter_metadata(queryset),
}
# Cache the result
cache.set(cache_key, result, self.CACHE_TIMEOUT)
return result
def get_progressive_load(
self,
offset: int,
filters: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Get next batch of parks for progressive loading.
Args:
offset: Starting offset for the batch
filters: Optional filters to apply
Returns:
Dictionary containing parks data and metadata
"""
cache_key = self._generate_cache_key(f'progressive_{offset}', filters)
cached_result = cache.get(cache_key)
if cached_result:
return cached_result
# Apply filters if provided
queryset = self.base_queryset
if filters:
queryset = self._apply_filters(queryset, filters)
# Get the batch
end_offset = offset + self.PROGRESSIVE_LOAD_SIZE
parks = list(queryset[offset:end_offset])
# Check if there are more records
total_count = queryset.count()
has_more = end_offset < total_count
result = {
'parks': parks,
'total_count': total_count,
'has_more': has_more,
'next_offset': end_offset if has_more else None,
}
# Cache the result
cache.set(cache_key, result, self.CACHE_TIMEOUT)
return result
def get_filter_metadata(self, filters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Get metadata about available filter options.
Args:
filters: Current filters to scope the metadata
Returns:
Dictionary containing filter metadata
"""
cache_key = self._generate_cache_key('metadata', filters)
cached_result = cache.get(cache_key)
if cached_result:
return cached_result
# Apply filters if provided
queryset = self.base_queryset
if filters:
queryset = self._apply_filters(queryset, filters)
result = self._get_filter_metadata(queryset)
# Cache the result
cache.set(cache_key, result, self.CACHE_TIMEOUT)
return result
def _apply_filters(self, queryset: models.QuerySet, filters: Dict[str, Any]) -> models.QuerySet:
"""Apply filters to the queryset."""
# Status filter
if 'status' in filters and filters['status']:
if isinstance(filters['status'], list):
queryset = queryset.filter(status__in=filters['status'])
else:
queryset = queryset.filter(status=filters['status'])
# Park type filter
if 'park_type' in filters and filters['park_type']:
if isinstance(filters['park_type'], list):
queryset = queryset.filter(park_type__in=filters['park_type'])
else:
queryset = queryset.filter(park_type=filters['park_type'])
# Country filter
if 'country' in filters and filters['country']:
queryset = queryset.filter(location__country__in=filters['country'])
# State filter
if 'state' in filters and filters['state']:
queryset = queryset.filter(location__state__in=filters['state'])
# Opening year range
if 'opening_year_min' in filters and filters['opening_year_min']:
queryset = queryset.filter(opening_year__gte=filters['opening_year_min'])
if 'opening_year_max' in filters and filters['opening_year_max']:
queryset = queryset.filter(opening_year__lte=filters['opening_year_max'])
# Size range
if 'size_min' in filters and filters['size_min']:
queryset = queryset.filter(size_acres__gte=filters['size_min'])
if 'size_max' in filters and filters['size_max']:
queryset = queryset.filter(size_acres__lte=filters['size_max'])
# Rating range
if 'rating_min' in filters and filters['rating_min']:
queryset = queryset.filter(average_rating__gte=filters['rating_min'])
if 'rating_max' in filters and filters['rating_max']:
queryset = queryset.filter(average_rating__lte=filters['rating_max'])
# Ride count range
if 'ride_count_min' in filters and filters['ride_count_min']:
queryset = queryset.filter(ride_count__gte=filters['ride_count_min'])
if 'ride_count_max' in filters and filters['ride_count_max']:
queryset = queryset.filter(ride_count__lte=filters['ride_count_max'])
# Coaster count range
if 'coaster_count_min' in filters and filters['coaster_count_min']:
queryset = queryset.filter(coaster_count__gte=filters['coaster_count_min'])
if 'coaster_count_max' in filters and filters['coaster_count_max']:
queryset = queryset.filter(coaster_count__lte=filters['coaster_count_max'])
# Operator filter
if 'operator' in filters and filters['operator']:
if isinstance(filters['operator'], list):
queryset = queryset.filter(operator__slug__in=filters['operator'])
else:
queryset = queryset.filter(operator__slug=filters['operator'])
# Search query
if 'search' in filters and filters['search']:
search_term = filters['search'].lower()
queryset = queryset.filter(search_text__icontains=search_term)
return queryset
def _get_filter_metadata(self, queryset: models.QuerySet) -> Dict[str, Any]:
"""Generate filter metadata from the current queryset."""
# Get distinct values for categorical filters with counts
countries_data = list(
queryset.values('location__country')
.exclude(location__country__isnull=True)
.annotate(count=models.Count('id'))
.order_by('location__country')
)
states_data = list(
queryset.values('location__state')
.exclude(location__state__isnull=True)
.annotate(count=models.Count('id'))
.order_by('location__state')
)
park_types_data = list(
queryset.values('park_type')
.exclude(park_type__isnull=True)
.annotate(count=models.Count('id'))
.order_by('park_type')
)
statuses_data = list(
queryset.values('status')
.annotate(count=models.Count('id'))
.order_by('status')
)
operators_data = list(
queryset.select_related('operator')
.values('operator__id', 'operator__name', 'operator__slug')
.exclude(operator__isnull=True)
.annotate(count=models.Count('id'))
.order_by('operator__name')
)
# Convert to frontend-expected format with value/label/count
countries = [
{
'value': item['location__country'],
'label': item['location__country'],
'count': item['count']
}
for item in countries_data
]
states = [
{
'value': item['location__state'],
'label': item['location__state'],
'count': item['count']
}
for item in states_data
]
park_types = [
{
'value': item['park_type'],
'label': item['park_type'],
'count': item['count']
}
for item in park_types_data
]
statuses = [
{
'value': item['status'],
'label': self._get_status_label(item['status']),
'count': item['count']
}
for item in statuses_data
]
operators = [
{
'value': item['operator__slug'],
'label': item['operator__name'],
'count': item['count']
}
for item in operators_data
]
# Get ranges for numerical filters
aggregates = queryset.aggregate(
opening_year_min=models.Min('opening_year'),
opening_year_max=models.Max('opening_year'),
size_min=models.Min('size_acres'),
size_max=models.Max('size_acres'),
rating_min=models.Min('average_rating'),
rating_max=models.Max('average_rating'),
ride_count_min=models.Min('ride_count'),
ride_count_max=models.Max('ride_count'),
coaster_count_min=models.Min('coaster_count'),
coaster_count_max=models.Max('coaster_count'),
)
return {
'categorical': {
'countries': countries,
'states': states,
'park_types': park_types,
'statuses': statuses,
'operators': operators,
},
'ranges': {
'opening_year': {
'min': aggregates['opening_year_min'],
'max': aggregates['opening_year_max'],
'step': 1,
'unit': 'year'
},
'size_acres': {
'min': float(aggregates['size_min']) if aggregates['size_min'] else None,
'max': float(aggregates['size_max']) if aggregates['size_max'] else None,
'step': 1.0,
'unit': 'acres'
},
'average_rating': {
'min': float(aggregates['rating_min']) if aggregates['rating_min'] else None,
'max': float(aggregates['rating_max']) if aggregates['rating_max'] else None,
'step': 0.1,
'unit': 'stars'
},
'ride_count': {
'min': aggregates['ride_count_min'],
'max': aggregates['ride_count_max'],
'step': 1,
'unit': 'rides'
},
'coaster_count': {
'min': aggregates['coaster_count_min'],
'max': aggregates['coaster_count_max'],
'step': 1,
'unit': 'coasters'
},
},
'total_count': queryset.count(),
}
def _get_status_label(self, status: str) -> str:
"""Convert status code to human-readable label."""
status_labels = {
'OPERATING': 'Operating',
'CLOSED_TEMP': 'Temporarily Closed',
'CLOSED_PERM': 'Permanently Closed',
'UNDER_CONSTRUCTION': 'Under Construction',
}
if status in status_labels:
return status_labels[status]
else:
raise ValueError(f"Unknown park status: {status}")
def _generate_cache_key(self, operation: str, filters: Optional[Dict[str, Any]] = None) -> str:
"""Generate cache key for the given operation and filters."""
key_parts = [self.CACHE_KEY_PREFIX, operation]
if filters:
# Create a consistent string representation of filters
filter_str = '_'.join(f"{k}:{v}" for k, v in sorted(filters.items()) if v)
key_parts.append(filter_str)
return '_'.join(key_parts)
def invalidate_cache(self, filters: Optional[Dict[str, Any]] = None) -> None:
"""Invalidate cached data for the given filters."""
# This is a simplified implementation
# In production, you might want to use cache versioning or tags
cache_keys = [
self._generate_cache_key('initial', filters),
self._generate_cache_key('metadata', filters),
]
# Also invalidate progressive load caches
for offset in range(0, 1000, self.PROGRESSIVE_LOAD_SIZE):
cache_keys.append(self._generate_cache_key(f'progressive_{offset}', filters))
cache.delete_many(cache_keys)
# Singleton instance
smart_park_loader = SmartParkLoader()

View File

@@ -0,0 +1,491 @@
"""
Parks-specific location services with OpenStreetMap integration.
Handles geocoding, reverse geocoding, and location search for parks.
"""
import requests
from typing import List, Dict, Any, Optional
from django.core.cache import cache
from django.db import transaction
import logging
from ..models import ParkLocation
logger = logging.getLogger(__name__)
class ParkLocationService:
"""
Location service specifically for parks using OpenStreetMap Nominatim API.
"""
NOMINATIM_BASE_URL = "https://nominatim.openstreetmap.org"
USER_AGENT = "ThrillWiki/1.0 (https://thrillwiki.com)"
@classmethod
def search_locations(cls, query: str, limit: int = 10) -> Dict[str, Any]:
"""
Search for locations using OpenStreetMap Nominatim API.
Optimized for finding theme parks and amusement parks.
Args:
query: Search query string
limit: Maximum number of results (default: 10, max: 25)
Returns:
Dictionary with search results
"""
if not query.strip():
return {"count": 0, "results": [], "query": query}
# Limit the number of results
limit = min(limit, 25)
# Check cache first
cache_key = f"park_location_search:{query.lower()}:{limit}"
cached_result = cache.get(cache_key)
if cached_result:
return cached_result
try:
params = {
"q": query,
"format": "json",
"limit": limit,
"addressdetails": 1,
"extratags": 1,
"namedetails": 1,
"accept-language": "en",
# Prioritize places that might be parks or entertainment venues
"featuretype": "settlement,leisure,tourism",
}
headers = {
"User-Agent": cls.USER_AGENT,
}
response = requests.get(
f"{cls.NOMINATIM_BASE_URL}/search",
params=params,
headers=headers,
timeout=10,
)
response.raise_for_status()
osm_results = response.json()
# Transform OSM results to our format
results = []
for item in osm_results:
result = cls._transform_osm_result(item)
if result:
results.append(result)
result_data = {"count": len(results), "results": results, "query": query}
# Cache for 1 hour
cache.set(cache_key, result_data, 3600)
return result_data
except requests.RequestException as e:
logger.error(f"Error searching park locations: {str(e)}")
return {
"count": 0,
"results": [],
"query": query,
"error": "Location search service temporarily unavailable",
}
@classmethod
def reverse_geocode(cls, latitude: float, longitude: float) -> Dict[str, Any]:
"""
Reverse geocode coordinates to get location information using OSM.
Args:
latitude: Latitude coordinate
longitude: Longitude coordinate
Returns:
Dictionary with location information
"""
# Validate coordinates
if not (-90 <= latitude <= 90) or not (-180 <= longitude <= 180):
return {"error": "Invalid coordinates"}
# Check cache first
cache_key = f"park_reverse_geocode:{latitude:.6f}:{longitude:.6f}"
cached_result = cache.get(cache_key)
if cached_result:
return cached_result
try:
params = {
"lat": latitude,
"lon": longitude,
"format": "json",
"addressdetails": 1,
"extratags": 1,
"namedetails": 1,
"accept-language": "en",
}
headers = {
"User-Agent": cls.USER_AGENT,
}
response = requests.get(
f"{cls.NOMINATIM_BASE_URL}/reverse",
params=params,
headers=headers,
timeout=10,
)
response.raise_for_status()
osm_result = response.json()
if "error" in osm_result:
return {"error": "Location not found"}
result = cls._transform_osm_reverse_result(osm_result)
# Cache for 24 hours
cache.set(cache_key, result, 86400)
return result
except requests.RequestException as e:
logger.error(f"Error reverse geocoding park location: {str(e)}")
return {"error": "Reverse geocoding service temporarily unavailable"}
@classmethod
def geocode_address(cls, address: str) -> Dict[str, Any]:
"""
Geocode an address to get coordinates using OSM.
Args:
address: Address string to geocode
Returns:
Dictionary with coordinates and location information
"""
if not address.strip():
return {"error": "Address is required"}
# Use search_locations for geocoding
results = cls.search_locations(address, limit=1)
if results["count"] > 0:
return results["results"][0]
else:
return {"error": "Address not found"}
@classmethod
def create_park_location(
cls,
*,
park,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
street_address: str = "",
city: str = "",
state: str = "",
country: str = "USA",
postal_code: str = "",
highway_exit: str = "",
parking_notes: str = "",
seasonal_notes: str = "",
osm_id: Optional[int] = None,
osm_type: str = "",
) -> ParkLocation:
"""
Create a location for a park with OSM integration.
Args:
park: Park instance
latitude: Latitude coordinate
longitude: Longitude coordinate
street_address: Street address
city: City name
state: State/region name
country: Country name (default: USA)
postal_code: Postal/ZIP code
highway_exit: Highway exit information
parking_notes: Parking information
seasonal_notes: Seasonal access notes
osm_id: OpenStreetMap ID
osm_type: OpenStreetMap type (node, way, relation)
Returns:
Created ParkLocation instance
"""
with transaction.atomic():
park_location = ParkLocation(
park=park,
street_address=street_address,
city=city,
state=state,
country=country,
postal_code=postal_code,
highway_exit=highway_exit,
parking_notes=parking_notes,
seasonal_notes=seasonal_notes,
osm_id=osm_id,
osm_type=osm_type,
)
# Set coordinates if provided
if latitude is not None and longitude is not None:
park_location.set_coordinates(latitude, longitude)
park_location.full_clean()
park_location.save()
return park_location
@classmethod
def update_park_location(
cls, park_location: ParkLocation, **updates
) -> ParkLocation:
"""
Update park location with validation.
Args:
park_location: ParkLocation instance to update
**updates: Fields to update
Returns:
Updated ParkLocation instance
"""
with transaction.atomic():
# Handle coordinates separately
latitude = updates.pop("latitude", None)
longitude = updates.pop("longitude", None)
# Update regular fields
for field, value in updates.items():
if hasattr(park_location, field):
setattr(park_location, field, value)
# Update coordinates if provided
if latitude is not None and longitude is not None:
park_location.set_coordinates(latitude, longitude)
park_location.full_clean()
park_location.save()
return park_location
@classmethod
def find_nearby_parks(
cls, latitude: float, longitude: float, radius_km: float = 50
) -> List[ParkLocation]:
"""
Find parks near given coordinates using PostGIS.
Args:
latitude: Center latitude
longitude: Center longitude
radius_km: Search radius in kilometers
Returns:
List of nearby ParkLocation instances
"""
from django.contrib.gis.geos import Point
from django.contrib.gis.measure import Distance
center_point = Point(longitude, latitude, srid=4326)
return list(
ParkLocation.objects.filter(
point__distance_lte=(center_point, Distance(km=radius_km))
)
.select_related("park", "park__operator")
.order_by("point__distance")
)
@classmethod
def enrich_location_from_osm(cls, park_location: ParkLocation) -> ParkLocation:
"""
Enrich park location data using OSM reverse geocoding.
Args:
park_location: ParkLocation instance to enrich
Returns:
Updated ParkLocation instance
"""
if not park_location.point:
return park_location
# Get detailed location info from OSM
osm_data = cls.reverse_geocode(park_location.latitude, park_location.longitude)
if "error" not in osm_data:
updates = {}
# Update missing address components
if not park_location.street_address and osm_data.get("street_address"):
updates["street_address"] = osm_data["street_address"]
if not park_location.city and osm_data.get("city"):
updates["city"] = osm_data["city"]
if not park_location.state and osm_data.get("state"):
updates["state"] = osm_data["state"]
if not park_location.country and osm_data.get("country"):
updates["country"] = osm_data["country"]
if not park_location.postal_code and osm_data.get("postal_code"):
updates["postal_code"] = osm_data["postal_code"]
# Update OSM metadata
if osm_data.get("osm_id"):
updates["osm_id"] = osm_data["osm_id"]
if osm_data.get("osm_type"):
updates["osm_type"] = osm_data["osm_type"]
if updates:
return cls.update_park_location(park_location, **updates)
return park_location
@classmethod
def _transform_osm_result(
cls, osm_item: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Transform OSM search result to our standard format."""
try:
address = osm_item.get("address", {})
# Extract address components
street_number = address.get("house_number", "")
street_name = address.get("road", "")
street_address = f"{street_number} {street_name}".strip()
city = (
address.get("city")
or address.get("town")
or address.get("village")
or address.get("municipality")
or ""
)
state = (
address.get("state")
or address.get("province")
or address.get("region")
or ""
)
country = address.get("country", "")
postal_code = address.get("postcode", "")
# Build formatted address
address_parts = []
if street_address:
address_parts.append(street_address)
if city:
address_parts.append(city)
if state:
address_parts.append(state)
if postal_code:
address_parts.append(postal_code)
if country:
address_parts.append(country)
formatted_address = ", ".join(address_parts)
# Check if this might be a theme park or entertainment venue
place_type = osm_item.get("type", "").lower()
extratags = osm_item.get("extratags", {})
is_park_related = any(
[
"park" in place_type,
"theme" in place_type,
"amusement" in place_type,
"attraction" in place_type,
extratags.get("tourism") == "theme_park",
extratags.get("leisure") == "amusement_arcade",
extratags.get("amenity") == "amusement_arcade",
]
)
return {
"name": osm_item.get("display_name", ""),
"latitude": float(osm_item["lat"]),
"longitude": float(osm_item["lon"]),
"formatted_address": formatted_address,
"street_address": street_address,
"city": city,
"state": state,
"country": country,
"postal_code": postal_code,
"osm_id": osm_item.get("osm_id"),
"osm_type": osm_item.get("osm_type"),
"place_type": place_type,
"importance": osm_item.get("importance", 0),
"is_park_related": is_park_related,
}
except (KeyError, ValueError, TypeError) as e:
logger.warning(f"Error transforming OSM result: {str(e)}")
return None
@classmethod
def _transform_osm_reverse_result(
cls, osm_result: Dict[str, Any]
) -> Dict[str, Any]:
"""Transform OSM reverse geocoding result to our standard format."""
address = osm_result.get("address", {})
# Extract address components
street_number = address.get("house_number", "")
street_name = address.get("road", "")
street_address = f"{street_number} {street_name}".strip()
city = (
address.get("city")
or address.get("town")
or address.get("village")
or address.get("municipality")
or ""
)
state = (
address.get("state")
or address.get("province")
or address.get("region")
or ""
)
country = address.get("country", "")
postal_code = address.get("postcode", "")
# Build formatted address
address_parts = []
if street_address:
address_parts.append(street_address)
if city:
address_parts.append(city)
if state:
address_parts.append(state)
if postal_code:
address_parts.append(postal_code)
if country:
address_parts.append(country)
formatted_address = ", ".join(address_parts)
return {
"name": osm_result.get("display_name", ""),
"latitude": float(osm_result["lat"]),
"longitude": float(osm_result["lon"]),
"formatted_address": formatted_address,
"street_address": street_address,
"city": city,
"state": state,
"country": country,
"postal_code": postal_code,
"osm_id": osm_result.get("osm_id"),
"osm_type": osm_result.get("osm_type"),
"place_type": osm_result.get("type", ""),
}

View File

@@ -0,0 +1,241 @@
"""
Park-specific media service for ThrillWiki.
This module provides media management functionality specific to parks.
"""
import logging
from typing import List, Optional, Dict, Any
from django.core.files.uploadedfile import UploadedFile
from django.db import transaction
from django.contrib.auth import get_user_model
from apps.core.services.media_service import MediaService
from ..models import Park, ParkPhoto
User = get_user_model()
logger = logging.getLogger(__name__)
class ParkMediaService:
"""Service for managing park-specific media operations."""
@staticmethod
def upload_photo(
park: Park,
image_file: UploadedFile,
user: User,
caption: str = "",
alt_text: str = "",
is_primary: bool = False,
auto_approve: bool = False,
) -> ParkPhoto:
"""
Upload a photo for a park.
Args:
park: Park instance
image_file: Uploaded image file
user: User uploading the photo
caption: Photo caption
alt_text: Alt text for accessibility
is_primary: Whether this should be the primary photo
auto_approve: Whether to auto-approve the photo
Returns:
Created ParkPhoto instance
Raises:
ValueError: If image validation fails
"""
# Validate image file
is_valid, error_message = MediaService.validate_image_file(image_file)
if not is_valid:
raise ValueError(error_message)
# Process image
processed_image = MediaService.process_image(image_file)
with transaction.atomic():
# Create photo instance
photo = ParkPhoto(
park=park,
image=processed_image,
caption=caption or MediaService.generate_default_caption(user.username),
alt_text=alt_text,
is_primary=is_primary,
is_approved=auto_approve,
uploaded_by=user,
)
# Extract EXIF date
photo.date_taken = MediaService.extract_exif_date(processed_image)
photo.save()
logger.info(f"Photo uploaded for park {park.slug} by user {user.username}")
return photo
@staticmethod
def get_park_photos(
park: Park, approved_only: bool = True, primary_first: bool = True
) -> List[ParkPhoto]:
"""
Get photos for a park.
Args:
park: Park instance
approved_only: Whether to only return approved photos
primary_first: Whether to order primary photos first
Returns:
List of ParkPhoto instances
"""
queryset = park.photos.all()
if approved_only:
queryset = queryset.filter(is_approved=True)
if primary_first:
queryset = queryset.order_by("-is_primary", "-created_at")
else:
queryset = queryset.order_by("-created_at")
return list(queryset)
@staticmethod
def get_primary_photo(park: Park) -> Optional[ParkPhoto]:
"""
Get the primary photo for a park.
Args:
park: Park instance
Returns:
Primary ParkPhoto instance or None
"""
try:
return park.photos.filter(is_primary=True, is_approved=True).first()
except ParkPhoto.DoesNotExist:
return None
@staticmethod
def set_primary_photo(park: Park, photo: ParkPhoto) -> bool:
"""
Set a photo as the primary photo for a park.
Args:
park: Park instance
photo: ParkPhoto to set as primary
Returns:
True if successful, False otherwise
"""
if photo.park != park:
return False
with transaction.atomic():
# Unset current primary
park.photos.filter(is_primary=True).update(is_primary=False)
# Set new primary
photo.is_primary = True
photo.save()
logger.info(f"Set photo {photo.pk} as primary for park {park.slug}")
return True
@staticmethod
def approve_photo(photo: ParkPhoto, approved_by: User) -> bool:
"""
Approve a park photo.
Args:
photo: ParkPhoto to approve
approved_by: User approving the photo
Returns:
True if successful, False otherwise
"""
try:
photo.is_approved = True
photo.save()
logger.info(f"Photo {photo.pk} approved by user {approved_by.username}")
return True
except Exception as e:
logger.error(f"Failed to approve photo {photo.pk}: {str(e)}")
return False
@staticmethod
def delete_photo(photo: ParkPhoto, deleted_by: User) -> bool:
"""
Delete a park photo.
Args:
photo: ParkPhoto to delete
deleted_by: User deleting the photo
Returns:
True if successful, False otherwise
"""
try:
park_slug = photo.park.slug
photo_id = photo.pk
# Delete the file and database record
if photo.image:
photo.image.delete(save=False)
photo.delete()
logger.info(
f"Photo {photo_id} deleted from park {park_slug} by user {deleted_by.username}"
)
return True
except Exception as e:
logger.error(f"Failed to delete photo {photo.pk}: {str(e)}")
return False
@staticmethod
def get_photo_stats(park: Park) -> Dict[str, Any]:
"""
Get photo statistics for a park.
Args:
park: Park instance
Returns:
Dictionary with photo statistics
"""
photos = park.photos.all()
return {
"total_photos": photos.count(),
"approved_photos": photos.filter(is_approved=True).count(),
"pending_photos": photos.filter(is_approved=False).count(),
"has_primary": photos.filter(is_primary=True).exists(),
"recent_uploads": photos.order_by("-created_at")[:5].count(),
}
@staticmethod
def bulk_approve_photos(photos: List[ParkPhoto], approved_by: User) -> int:
"""
Bulk approve multiple photos.
Args:
photos: List of ParkPhoto instances to approve
approved_by: User approving the photos
Returns:
Number of photos successfully approved
"""
approved_count = 0
with transaction.atomic():
for photo in photos:
if ParkMediaService.approve_photo(photo, approved_by):
approved_count += 1
logger.info(
f"Bulk approved {approved_count} photos by user {approved_by.username}"
)
return approved_count

View File

@@ -0,0 +1,228 @@
"""
Services for park-related business logic.
Following Django styleguide pattern for business logic encapsulation.
"""
from typing import Optional, Dict, Any, TYPE_CHECKING
from django.db import transaction
from django.db.models import Q
if TYPE_CHECKING:
from django.contrib.auth.models import AbstractUser
from ..models import Park, ParkArea
from .location_service import ParkLocationService
class ParkService:
"""Service for managing park operations."""
@staticmethod
def create_park(
*,
name: str,
description: str = "",
status: str = "OPERATING",
operator_id: Optional[int] = None,
property_owner_id: Optional[int] = None,
opening_date: Optional[str] = None,
closing_date: Optional[str] = None,
operating_season: str = "",
size_acres: Optional[float] = None,
website: str = "",
location_data: Optional[Dict[str, Any]] = None,
created_by: Optional["AbstractUser"] = None,
) -> Park:
"""
Create a new park with validation and location handling.
Args:
name: Park name
description: Park description
status: Operating status
operator_id: ID of operating company
property_owner_id: ID of property owner company
opening_date: Opening date
closing_date: Closing date
operating_season: Operating season description
size_acres: Park size in acres
website: Park website URL
location_data: Dictionary containing location information
created_by: User creating the park
Returns:
Created Park instance
Raises:
ValidationError: If park data is invalid
"""
with transaction.atomic():
# Create park instance
park = Park(
name=name,
description=description,
status=status,
opening_date=opening_date,
closing_date=closing_date,
operating_season=operating_season,
size_acres=size_acres,
website=website,
)
# Set foreign key relationships if provided
if operator_id:
from apps.parks.models import Company
park.operator = Company.objects.get(id=operator_id)
if property_owner_id:
from apps.parks.models import Company
park.property_owner = Company.objects.get(id=property_owner_id)
# CRITICAL STYLEGUIDE FIX: Call full_clean before save
park.full_clean()
park.save()
# Handle location if provided
if location_data:
ParkLocationService.create_park_location(park=park, **location_data)
return park
@staticmethod
def update_park(
*,
park_id: int,
updates: Dict[str, Any],
updated_by: Optional["AbstractUser"] = None,
) -> Park:
"""
Update an existing park with validation.
Args:
park_id: ID of park to update
updates: Dictionary of field updates
updated_by: User performing the update
Returns:
Updated Park instance
Raises:
Park.DoesNotExist: If park doesn't exist
ValidationError: If update data is invalid
"""
with transaction.atomic():
park = Park.objects.select_for_update().get(id=park_id)
# Apply updates
for field, value in updates.items():
if hasattr(park, field):
setattr(park, field, value)
# CRITICAL STYLEGUIDE FIX: Call full_clean before save
park.full_clean()
park.save()
return park
@staticmethod
def delete_park(
*, park_id: int, deleted_by: Optional["AbstractUser"] = None
) -> bool:
"""
Soft delete a park by setting status to DEMOLISHED.
Args:
park_id: ID of park to delete
deleted_by: User performing the deletion
Returns:
True if successfully deleted
Raises:
Park.DoesNotExist: If park doesn't exist
"""
with transaction.atomic():
park = Park.objects.select_for_update().get(id=park_id)
park.status = "DEMOLISHED"
# CRITICAL STYLEGUIDE FIX: Call full_clean before save
park.full_clean()
park.save()
return True
@staticmethod
def create_park_area(
*,
park_id: int,
name: str,
description: str = "",
created_by: Optional["AbstractUser"] = None,
) -> ParkArea:
"""
Create a new area within a park.
Args:
park_id: ID of the parent park
name: Area name
description: Area description
created_by: User creating the area
Returns:
Created ParkArea instance
Raises:
Park.DoesNotExist: If park doesn't exist
ValidationError: If area data is invalid
"""
park = Park.objects.get(id=park_id)
area = ParkArea(park=park, name=name, description=description)
# CRITICAL STYLEGUIDE FIX: Call full_clean before save
area.full_clean()
area.save()
return area
@staticmethod
def update_park_statistics(*, park_id: int) -> Park:
"""
Recalculate and update park statistics (ride counts, ratings).
Args:
park_id: ID of park to update statistics for
Returns:
Updated Park instance with fresh statistics
"""
from apps.rides.models import Ride
from apps.parks.models import ParkReview
from django.db.models import Count, Avg
with transaction.atomic():
park = Park.objects.select_for_update().get(id=park_id)
# Calculate ride counts
ride_stats = Ride.objects.filter(park=park).aggregate(
total_rides=Count("id"),
coaster_count=Count("id", filter=Q(category__in=["RC", "WC"])),
)
# Calculate average rating
avg_rating = ParkReview.objects.filter(
park=park, is_published=True
).aggregate(avg_rating=Avg("rating"))["avg_rating"]
# Update park fields
park.ride_count = ride_stats["total_rides"] or 0
park.coaster_count = ride_stats["coaster_count"] or 0
park.average_rating = avg_rating
# CRITICAL STYLEGUIDE FIX: Call full_clean before save
park.full_clean()
park.save()
return park

View File

@@ -0,0 +1,698 @@
"""
Road Trip Service for theme park planning using OpenStreetMap APIs.
This service provides functionality for:
- Geocoding addresses using Nominatim
- Route calculation using OSRM
- Park discovery along routes
- Multi-park trip planning
- Proper rate limiting and caching
"""
import time
import math
import logging
import requests
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from itertools import permutations
from django.conf import settings
from django.core.cache import cache
from django.contrib.gis.geos import Point
from django.contrib.gis.measure import Distance
from apps.parks.models import Park
logger = logging.getLogger(__name__)
@dataclass
class Coordinates:
"""Represents latitude and longitude coordinates."""
latitude: float
longitude: float
def to_list(self) -> List[float]:
"""Return as [lat, lon] list."""
return [self.latitude, self.longitude]
def to_point(self) -> Point:
"""Convert to Django Point object."""
return Point(self.longitude, self.latitude, srid=4326)
@dataclass
class RouteInfo:
"""Information about a calculated route."""
distance_km: float
duration_minutes: int
geometry: Optional[str] = None # Encoded polyline
@property
def formatted_distance(self) -> str:
"""Return formatted distance string."""
if self.distance_km < 1:
return f"{self.distance_km * 1000:.0f}m"
return f"{self.distance_km:.1f}km"
@property
def formatted_duration(self) -> str:
"""Return formatted duration string."""
hours = self.duration_minutes // 60
minutes = self.duration_minutes % 60
if hours == 0:
return f"{minutes}min"
elif minutes == 0:
return f"{hours}h"
else:
return f"{hours}h {minutes}min"
@dataclass
class TripLeg:
"""Represents one leg of a multi-park trip."""
from_park: "Park"
to_park: "Park"
route: RouteInfo
@property
def parks_along_route(self) -> List["Park"]:
"""Get parks along this route segment."""
# This would be populated by find_parks_along_route
return []
@dataclass
class RoadTrip:
"""Complete road trip with multiple parks."""
parks: List["Park"]
legs: List[TripLeg]
total_distance_km: float
total_duration_minutes: int
@property
def formatted_total_distance(self) -> str:
"""Return formatted total distance."""
return f"{self.total_distance_km:.1f}km"
@property
def formatted_total_duration(self) -> str:
"""Return formatted total duration."""
hours = self.total_duration_minutes // 60
minutes = self.total_duration_minutes % 60
if hours == 0:
return f"{minutes}min"
elif minutes == 0:
return f"{hours}h"
else:
return f"{hours}h {minutes}min"
class RateLimiter:
"""Simple rate limiter for API requests."""
def __init__(self, max_requests_per_second: float = 1.0):
self.max_requests_per_second = max_requests_per_second
self.min_interval = 1.0 / max_requests_per_second
self.last_request_time = 0.0
def wait_if_needed(self):
"""Wait if necessary to respect rate limits."""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_interval:
wait_time = self.min_interval - time_since_last
time.sleep(wait_time)
self.last_request_time = time.time()
class OSMAPIException(Exception):
"""Exception for OSM API related errors."""
class RoadTripService:
"""
Service for planning road trips between theme parks using OpenStreetMap APIs.
"""
def __init__(self):
self.nominatim_base_url = "https://nominatim.openstreetmap.org"
self.osrm_base_url = "http://router.project-osrm.org/route/v1/driving"
# Configuration from Django settings
self.cache_timeout = getattr(settings, "ROADTRIP_CACHE_TIMEOUT", 3600 * 24)
self.route_cache_timeout = getattr(
settings, "ROADTRIP_ROUTE_CACHE_TIMEOUT", 3600 * 6
)
self.user_agent = getattr(
settings, "ROADTRIP_USER_AGENT", "ThrillWiki Road Trip Planner"
)
self.request_timeout = getattr(settings, "ROADTRIP_REQUEST_TIMEOUT", 10)
self.max_retries = getattr(settings, "ROADTRIP_MAX_RETRIES", 3)
self.backoff_factor = getattr(settings, "ROADTRIP_BACKOFF_FACTOR", 2)
# Rate limiter
max_rps = getattr(settings, "ROADTRIP_MAX_REQUESTS_PER_SECOND", 1)
self.rate_limiter = RateLimiter(max_rps)
# Request session with proper headers
self.session = requests.Session()
self.session.headers.update(
{
"User-Agent": self.user_agent,
"Accept": "application/json",
}
)
def _make_request(self, url: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Make HTTP request with rate limiting, retries, and error handling.
"""
self.rate_limiter.wait_if_needed()
for attempt in range(self.max_retries):
try:
response = self.session.get(
url, params=params, timeout=self.request_timeout
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.warning(f"Request attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
wait_time = self.backoff_factor**attempt
time.sleep(wait_time)
else:
raise OSMAPIException(
f"Failed to make request after {self.max_retries} attempts: {e}"
)
def geocode_address(self, address: str) -> Optional[Coordinates]:
"""
Convert address to coordinates using Nominatim geocoding service.
Args:
address: Address string to geocode
Returns:
Coordinates object or None if geocoding fails
"""
if not address or not address.strip():
return None
# Check cache first
cache_key = f"roadtrip:geocode:{hash(address.lower().strip())}"
cached_result = cache.get(cache_key)
if cached_result:
return Coordinates(**cached_result)
try:
params = {
"q": address.strip(),
"format": "json",
"limit": 1,
"addressdetails": 1,
}
url = f"{self.nominatim_base_url}/search"
response = self._make_request(url, params)
if response and len(response) > 0:
result = response[0]
coords = Coordinates(
latitude=float(result["lat"]),
longitude=float(result["lon"]),
)
# Cache the result
cache.set(
cache_key,
{
"latitude": coords.latitude,
"longitude": coords.longitude,
},
self.cache_timeout,
)
logger.info(
f"Geocoded '{address}' to {coords.latitude}, {coords.longitude}"
)
return coords
else:
logger.warning(f"No geocoding results for address: {address}")
return None
except Exception as e:
logger.error(f"Geocoding failed for '{address}': {e}")
return None
def calculate_route(
self, start_coords: Coordinates, end_coords: Coordinates
) -> Optional[RouteInfo]:
"""
Calculate route between two coordinate points using OSRM.
Args:
start_coords: Starting coordinates
end_coords: Ending coordinates
Returns:
RouteInfo object or None if routing fails
"""
if not start_coords or not end_coords:
return None
# Check cache first
cache_key = f"roadtrip:route:{start_coords.latitude},{start_coords.longitude}:{
end_coords.latitude
},{end_coords.longitude}"
cached_result = cache.get(cache_key)
if cached_result:
return RouteInfo(**cached_result)
try:
# Format coordinates for OSRM (lon,lat format)
coords_string = f"{start_coords.longitude},{start_coords.latitude};{
end_coords.longitude
},{end_coords.latitude}"
url = f"{self.osrm_base_url}/{coords_string}"
params = {
"overview": "full",
"geometries": "polyline",
"steps": "false",
}
response = self._make_request(url, params)
if response.get("code") == "Ok" and response.get("routes"):
route_data = response["routes"][0]
# Distance is in meters, convert to km
distance_km = route_data["distance"] / 1000.0
# Duration is in seconds, convert to minutes
duration_minutes = int(route_data["duration"] / 60)
route_info = RouteInfo(
distance_km=distance_km,
duration_minutes=duration_minutes,
geometry=route_data.get("geometry"),
)
# Cache the result
cache.set(
cache_key,
{
"distance_km": route_info.distance_km,
"duration_minutes": route_info.duration_minutes,
"geometry": route_info.geometry,
},
self.route_cache_timeout,
)
logger.info(
f"Route calculated: {route_info.formatted_distance}, {
route_info.formatted_duration
}"
)
return route_info
else:
# Fallback to straight-line distance calculation
logger.warning(
"OSRM routing failed, falling back to straight-line distance"
)
return self._calculate_straight_line_route(start_coords, end_coords)
except Exception as e:
logger.error(f"Route calculation failed: {e}")
# Fallback to straight-line distance
return self._calculate_straight_line_route(start_coords, end_coords)
def _calculate_straight_line_route(
self, start_coords: Coordinates, end_coords: Coordinates
) -> RouteInfo:
"""
Calculate straight-line distance as fallback when routing fails.
"""
# Haversine formula for great-circle distance
lat1, lon1 = (
math.radians(start_coords.latitude),
math.radians(start_coords.longitude),
)
lat2, lon2 = (
math.radians(end_coords.latitude),
math.radians(end_coords.longitude),
)
dlat = lat2 - lat1
dlon = lon2 - lon1
a = (
math.sin(dlat / 2) ** 2
+ math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
)
c = 2 * math.asin(math.sqrt(a))
# Earth's radius in kilometers
earth_radius_km = 6371.0
distance_km = earth_radius_km * c
# Estimate driving time (assume average 80 km/h with 25% extra for
# roads)
estimated_duration_minutes = int((distance_km * 1.25 / 80.0) * 60)
return RouteInfo(
distance_km=distance_km,
duration_minutes=estimated_duration_minutes,
geometry=None,
)
def find_parks_along_route(
self, start_park: "Park", end_park: "Park", max_detour_km: float = 50
) -> List["Park"]:
"""
Find parks along a route within specified detour distance.
Args:
start_park: Starting park
end_park: Ending park
max_detour_km: Maximum detour distance in kilometers
Returns:
List of parks along the route
"""
from apps.parks.models import Park
if not hasattr(start_park, "location") or not hasattr(end_park, "location"):
return []
if not start_park.location or not end_park.location:
return []
start_coords = start_park.coordinates
end_coords = end_park.coordinates
if not start_coords or not end_coords:
return []
start_point = Point(start_coords[1], start_coords[0], srid=4326) # lon, lat
# end_point is not used in this method - we use coordinates directly
# Find all parks within a reasonable distance from both start and end
max_search_distance = Distance(km=max_detour_km * 2)
candidate_parks = (
Park.objects.filter(
location__point__distance_lte=(
start_point,
max_search_distance,
)
)
.exclude(id__in=[start_park.id, end_park.id])
.select_related("location")
)
parks_along_route = []
for park in candidate_parks:
if not park.location or not park.location.point:
continue
park_coords = park.coordinates
if not park_coords:
continue
# Calculate detour distance
detour_distance = self._calculate_detour_distance(
Coordinates(*start_coords),
Coordinates(*end_coords),
Coordinates(*park_coords),
)
if detour_distance and detour_distance <= max_detour_km:
parks_along_route.append(park)
return parks_along_route
def _calculate_detour_distance(
self, start: Coordinates, end: Coordinates, waypoint: Coordinates
) -> Optional[float]:
"""
Calculate the detour distance when visiting a waypoint.
"""
try:
# Direct route distance
direct_route = self.calculate_route(start, end)
if not direct_route:
return None
# Route via waypoint
route_to_waypoint = self.calculate_route(start, waypoint)
route_from_waypoint = self.calculate_route(waypoint, end)
if not route_to_waypoint or not route_from_waypoint:
return None
detour_distance = (
route_to_waypoint.distance_km + route_from_waypoint.distance_km
) - direct_route.distance_km
return max(0, detour_distance) # Don't return negative detours
except Exception as e:
logger.error(f"Failed to calculate detour distance: {e}")
return None
def create_multi_park_trip(self, park_list: List["Park"]) -> Optional[RoadTrip]:
"""
Create optimized multi-park road trip using simple nearest neighbor heuristic.
Args:
park_list: List of parks to visit
Returns:
RoadTrip object with optimized route
"""
if len(park_list) < 2:
return None
# For small numbers of parks, try all permutations
if len(park_list) <= 6:
return self._optimize_trip_exhaustive(park_list)
else:
return self._optimize_trip_nearest_neighbor(park_list)
def _optimize_trip_exhaustive(self, park_list: List["Park"]) -> Optional[RoadTrip]:
"""
Find optimal route by testing all permutations (for small lists).
"""
best_trip = None
best_distance = float("inf")
# Try all possible orders (excluding the first park as starting point)
for perm in permutations(park_list[1:]):
ordered_parks = [park_list[0]] + list(perm)
trip = self._create_trip_from_order(ordered_parks)
if trip and trip.total_distance_km < best_distance:
best_distance = trip.total_distance_km
best_trip = trip
return best_trip
def _optimize_trip_nearest_neighbor(
self, park_list: List["Park"]
) -> Optional[RoadTrip]:
"""
Optimize trip using nearest neighbor heuristic (for larger lists).
"""
if not park_list:
return None
# Start with the first park
current_park = park_list[0]
ordered_parks = [current_park]
remaining_parks = park_list[1:]
while remaining_parks:
# Find nearest unvisited park
nearest_park = None
min_distance = float("inf")
current_coords = current_park.coordinates
if not current_coords:
break
for park in remaining_parks:
park_coords = park.coordinates
if not park_coords:
continue
route = self.calculate_route(
Coordinates(*current_coords), Coordinates(*park_coords)
)
if route and route.distance_km < min_distance:
min_distance = route.distance_km
nearest_park = park
if nearest_park:
ordered_parks.append(nearest_park)
remaining_parks.remove(nearest_park)
current_park = nearest_park
else:
break
return self._create_trip_from_order(ordered_parks)
def _create_trip_from_order(
self, ordered_parks: List["Park"]
) -> Optional[RoadTrip]:
"""
Create a RoadTrip object from an ordered list of parks.
"""
if len(ordered_parks) < 2:
return None
legs = []
total_distance = 0
total_duration = 0
for i in range(len(ordered_parks) - 1):
from_park = ordered_parks[i]
to_park = ordered_parks[i + 1]
from_coords = from_park.coordinates
to_coords = to_park.coordinates
if not from_coords or not to_coords:
continue
route = self.calculate_route(
Coordinates(*from_coords), Coordinates(*to_coords)
)
if route:
legs.append(TripLeg(from_park=from_park, to_park=to_park, route=route))
total_distance += route.distance_km
total_duration += route.duration_minutes
if not legs:
return None
return RoadTrip(
parks=ordered_parks,
legs=legs,
total_distance_km=total_distance,
total_duration_minutes=total_duration,
)
def get_park_distances(
self, center_park: "Park", radius_km: float = 100
) -> List[Dict[str, Any]]:
"""
Get all parks within radius of a center park with distances.
Args:
center_park: Center park for search
radius_km: Search radius in kilometers
Returns:
List of dictionaries with park and distance information
"""
from apps.parks.models import Park
if not hasattr(center_park, "location") or not center_park.location:
return []
center_coords = center_park.coordinates
if not center_coords:
return []
center_point = Point(center_coords[1], center_coords[0], srid=4326) # lon, lat
search_distance = Distance(km=radius_km)
nearby_parks = (
Park.objects.filter(
location__point__distance_lte=(center_point, search_distance)
)
.exclude(id=center_park.id)
.select_related("location")
)
results = []
for park in nearby_parks:
park_coords = park.coordinates
if not park_coords:
continue
route = self.calculate_route(
Coordinates(*center_coords), Coordinates(*park_coords)
)
if route:
results.append(
{
"park": park,
"distance_km": route.distance_km,
"duration_minutes": route.duration_minutes,
"formatted_distance": route.formatted_distance,
"formatted_duration": route.formatted_duration,
}
)
# Sort by distance
results.sort(key=lambda x: x["distance_km"])
return results
def geocode_park_if_needed(self, park: "Park") -> bool:
"""
Geocode park location if coordinates are missing.
Args:
park: Park to geocode
Returns:
True if geocoding succeeded or wasn't needed, False otherwise
"""
if not hasattr(park, "location") or not park.location:
return False
location = park.location
# If we already have coordinates, no need to geocode
if location.point:
return True
# Build address string for geocoding
address_parts = [
park.name,
location.street_address,
location.city,
location.state,
location.country,
]
address = ", ".join(part for part in address_parts if part)
if not address:
return False
coords = self.geocode_address(address)
if coords:
location.set_coordinates(coords.latitude, coords.longitude)
location.save()
logger.info(
f"Geocoded park '{park.name}' to {coords.latitude}, {coords.longitude}"
)
return True
return False