mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-20 11:51:10 -05:00
700 lines
22 KiB
Python
700 lines
22 KiB
Python
"""
|
|
Road Trip Service for theme park planning using OpenStreetMap APIs.
|
|
|
|
This service provides functionality for:
|
|
- Geocoding addresses using Nominatim
|
|
- Route calculation using OSRM
|
|
- Park discovery along routes
|
|
- Multi-park trip planning
|
|
- Proper rate limiting and caching
|
|
"""
|
|
|
|
import time
|
|
import math
|
|
import logging
|
|
import requests
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass
|
|
from itertools import permutations
|
|
|
|
from django.conf import settings
|
|
from django.core.cache import cache
|
|
# from django.contrib.gis.geos import Point # Disabled temporarily for setup
|
|
# from django.contrib.gis.measure import Distance # Disabled temporarily for setup
|
|
from apps.parks.models import Park
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class Coordinates:
|
|
"""Represents latitude and longitude coordinates."""
|
|
|
|
latitude: float
|
|
longitude: float
|
|
|
|
def to_list(self) -> List[float]:
|
|
"""Return as [lat, lon] list."""
|
|
return [self.latitude, self.longitude]
|
|
|
|
def to_point(self): # Point type disabled for setup
|
|
"""Convert to Django Point object."""
|
|
# return Point(self.longitude, self.latitude, srid=4326) # Disabled for setup
|
|
return None # Temporarily disabled
|
|
|
|
|
|
@dataclass
|
|
class RouteInfo:
|
|
"""Information about a calculated route."""
|
|
|
|
distance_km: float
|
|
duration_minutes: int
|
|
geometry: Optional[str] = None # Encoded polyline
|
|
|
|
@property
|
|
def formatted_distance(self) -> str:
|
|
"""Return formatted distance string."""
|
|
if self.distance_km < 1:
|
|
return f"{self.distance_km * 1000:.0f}m"
|
|
return f"{self.distance_km:.1f}km"
|
|
|
|
@property
|
|
def formatted_duration(self) -> str:
|
|
"""Return formatted duration string."""
|
|
hours = self.duration_minutes // 60
|
|
minutes = self.duration_minutes % 60
|
|
if hours == 0:
|
|
return f"{minutes}min"
|
|
elif minutes == 0:
|
|
return f"{hours}h"
|
|
else:
|
|
return f"{hours}h {minutes}min"
|
|
|
|
|
|
@dataclass
|
|
class TripLeg:
|
|
"""Represents one leg of a multi-park trip."""
|
|
|
|
from_park: "Park"
|
|
to_park: "Park"
|
|
route: RouteInfo
|
|
|
|
@property
|
|
def parks_along_route(self) -> List["Park"]:
|
|
"""Get parks along this route segment."""
|
|
# This would be populated by find_parks_along_route
|
|
return []
|
|
|
|
|
|
@dataclass
|
|
class RoadTrip:
|
|
"""Complete road trip with multiple parks."""
|
|
|
|
parks: List["Park"]
|
|
legs: List[TripLeg]
|
|
total_distance_km: float
|
|
total_duration_minutes: int
|
|
|
|
@property
|
|
def formatted_total_distance(self) -> str:
|
|
"""Return formatted total distance."""
|
|
return f"{self.total_distance_km:.1f}km"
|
|
|
|
@property
|
|
def formatted_total_duration(self) -> str:
|
|
"""Return formatted total duration."""
|
|
hours = self.total_duration_minutes // 60
|
|
minutes = self.total_duration_minutes % 60
|
|
if hours == 0:
|
|
return f"{minutes}min"
|
|
elif minutes == 0:
|
|
return f"{hours}h"
|
|
else:
|
|
return f"{hours}h {minutes}min"
|
|
|
|
|
|
class RateLimiter:
|
|
"""Simple rate limiter for API requests."""
|
|
|
|
def __init__(self, max_requests_per_second: float = 1.0):
|
|
self.max_requests_per_second = max_requests_per_second
|
|
self.min_interval = 1.0 / max_requests_per_second
|
|
self.last_request_time = 0.0
|
|
|
|
def wait_if_needed(self):
|
|
"""Wait if necessary to respect rate limits."""
|
|
current_time = time.time()
|
|
time_since_last = current_time - self.last_request_time
|
|
|
|
if time_since_last < self.min_interval:
|
|
wait_time = self.min_interval - time_since_last
|
|
time.sleep(wait_time)
|
|
|
|
self.last_request_time = time.time()
|
|
|
|
|
|
class OSMAPIException(Exception):
|
|
"""Exception for OSM API related errors."""
|
|
|
|
|
|
class RoadTripService:
|
|
"""
|
|
Service for planning road trips between theme parks using OpenStreetMap APIs.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.nominatim_base_url = "https://nominatim.openstreetmap.org"
|
|
self.osrm_base_url = "http://router.project-osrm.org/route/v1/driving"
|
|
|
|
# Configuration from Django settings
|
|
self.cache_timeout = getattr(settings, "ROADTRIP_CACHE_TIMEOUT", 3600 * 24)
|
|
self.route_cache_timeout = getattr(
|
|
settings, "ROADTRIP_ROUTE_CACHE_TIMEOUT", 3600 * 6
|
|
)
|
|
self.user_agent = getattr(
|
|
settings, "ROADTRIP_USER_AGENT", "ThrillWiki Road Trip Planner"
|
|
)
|
|
self.request_timeout = getattr(settings, "ROADTRIP_REQUEST_TIMEOUT", 10)
|
|
self.max_retries = getattr(settings, "ROADTRIP_MAX_RETRIES", 3)
|
|
self.backoff_factor = getattr(settings, "ROADTRIP_BACKOFF_FACTOR", 2)
|
|
|
|
# Rate limiter
|
|
max_rps = getattr(settings, "ROADTRIP_MAX_REQUESTS_PER_SECOND", 1)
|
|
self.rate_limiter = RateLimiter(max_rps)
|
|
|
|
# Request session with proper headers
|
|
self.session = requests.Session()
|
|
self.session.headers.update(
|
|
{
|
|
"User-Agent": self.user_agent,
|
|
"Accept": "application/json",
|
|
}
|
|
)
|
|
|
|
def _make_request(self, url: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Make HTTP request with rate limiting, retries, and error handling.
|
|
"""
|
|
self.rate_limiter.wait_if_needed()
|
|
|
|
for attempt in range(self.max_retries):
|
|
try:
|
|
response = self.session.get(
|
|
url, params=params, timeout=self.request_timeout
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(f"Request attempt {attempt + 1} failed: {e}")
|
|
|
|
if attempt < self.max_retries - 1:
|
|
wait_time = self.backoff_factor**attempt
|
|
time.sleep(wait_time)
|
|
else:
|
|
raise OSMAPIException(
|
|
f"Failed to make request after {self.max_retries} attempts: {e}"
|
|
)
|
|
|
|
def geocode_address(self, address: str) -> Optional[Coordinates]:
|
|
"""
|
|
Convert address to coordinates using Nominatim geocoding service.
|
|
|
|
Args:
|
|
address: Address string to geocode
|
|
|
|
Returns:
|
|
Coordinates object or None if geocoding fails
|
|
"""
|
|
if not address or not address.strip():
|
|
return None
|
|
|
|
# Check cache first
|
|
cache_key = f"roadtrip:geocode:{hash(address.lower().strip())}"
|
|
cached_result = cache.get(cache_key)
|
|
if cached_result:
|
|
return Coordinates(**cached_result)
|
|
|
|
try:
|
|
params = {
|
|
"q": address.strip(),
|
|
"format": "json",
|
|
"limit": 1,
|
|
"addressdetails": 1,
|
|
}
|
|
|
|
url = f"{self.nominatim_base_url}/search"
|
|
response = self._make_request(url, params)
|
|
|
|
if response and len(response) > 0:
|
|
result = response[0]
|
|
coords = Coordinates(
|
|
latitude=float(result["lat"]),
|
|
longitude=float(result["lon"]),
|
|
)
|
|
|
|
# Cache the result
|
|
cache.set(
|
|
cache_key,
|
|
{
|
|
"latitude": coords.latitude,
|
|
"longitude": coords.longitude,
|
|
},
|
|
self.cache_timeout,
|
|
)
|
|
|
|
logger.info(
|
|
f"Geocoded '{address}' to {coords.latitude}, {coords.longitude}"
|
|
)
|
|
return coords
|
|
else:
|
|
logger.warning(f"No geocoding results for address: {address}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Geocoding failed for '{address}': {e}")
|
|
return None
|
|
|
|
def calculate_route(
|
|
self, start_coords: Coordinates, end_coords: Coordinates
|
|
) -> Optional[RouteInfo]:
|
|
"""
|
|
Calculate route between two coordinate points using OSRM.
|
|
|
|
Args:
|
|
start_coords: Starting coordinates
|
|
end_coords: Ending coordinates
|
|
|
|
Returns:
|
|
RouteInfo object or None if routing fails
|
|
"""
|
|
if not start_coords or not end_coords:
|
|
return None
|
|
|
|
# Check cache first
|
|
cache_key = f"roadtrip:route:{start_coords.latitude},{start_coords.longitude}:{
|
|
end_coords.latitude
|
|
},{end_coords.longitude}"
|
|
cached_result = cache.get(cache_key)
|
|
if cached_result:
|
|
return RouteInfo(**cached_result)
|
|
|
|
try:
|
|
# Format coordinates for OSRM (lon,lat format)
|
|
coords_string = f"{start_coords.longitude},{start_coords.latitude};{
|
|
end_coords.longitude
|
|
},{end_coords.latitude}"
|
|
url = f"{self.osrm_base_url}/{coords_string}"
|
|
|
|
params = {
|
|
"overview": "full",
|
|
"geometries": "polyline",
|
|
"steps": "false",
|
|
}
|
|
|
|
response = self._make_request(url, params)
|
|
|
|
if response.get("code") == "Ok" and response.get("routes"):
|
|
route_data = response["routes"][0]
|
|
|
|
# Distance is in meters, convert to km
|
|
distance_km = route_data["distance"] / 1000.0
|
|
# Duration is in seconds, convert to minutes
|
|
duration_minutes = int(route_data["duration"] / 60)
|
|
|
|
route_info = RouteInfo(
|
|
distance_km=distance_km,
|
|
duration_minutes=duration_minutes,
|
|
geometry=route_data.get("geometry"),
|
|
)
|
|
|
|
# Cache the result
|
|
cache.set(
|
|
cache_key,
|
|
{
|
|
"distance_km": route_info.distance_km,
|
|
"duration_minutes": route_info.duration_minutes,
|
|
"geometry": route_info.geometry,
|
|
},
|
|
self.route_cache_timeout,
|
|
)
|
|
|
|
logger.info(
|
|
f"Route calculated: {route_info.formatted_distance}, {
|
|
route_info.formatted_duration
|
|
}"
|
|
)
|
|
return route_info
|
|
else:
|
|
# Fallback to straight-line distance calculation
|
|
logger.warning(
|
|
"OSRM routing failed, falling back to straight-line distance"
|
|
)
|
|
return self._calculate_straight_line_route(start_coords, end_coords)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Route calculation failed: {e}")
|
|
# Fallback to straight-line distance
|
|
return self._calculate_straight_line_route(start_coords, end_coords)
|
|
|
|
def _calculate_straight_line_route(
|
|
self, start_coords: Coordinates, end_coords: Coordinates
|
|
) -> RouteInfo:
|
|
"""
|
|
Calculate straight-line distance as fallback when routing fails.
|
|
"""
|
|
# Haversine formula for great-circle distance
|
|
lat1, lon1 = (
|
|
math.radians(start_coords.latitude),
|
|
math.radians(start_coords.longitude),
|
|
)
|
|
lat2, lon2 = (
|
|
math.radians(end_coords.latitude),
|
|
math.radians(end_coords.longitude),
|
|
)
|
|
|
|
dlat = lat2 - lat1
|
|
dlon = lon2 - lon1
|
|
|
|
a = (
|
|
math.sin(dlat / 2) ** 2
|
|
+ math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
|
|
)
|
|
c = 2 * math.asin(math.sqrt(a))
|
|
|
|
# Earth's radius in kilometers
|
|
earth_radius_km = 6371.0
|
|
distance_km = earth_radius_km * c
|
|
|
|
# Estimate driving time (assume average 80 km/h with 25% extra for
|
|
# roads)
|
|
estimated_duration_minutes = int((distance_km * 1.25 / 80.0) * 60)
|
|
|
|
return RouteInfo(
|
|
distance_km=distance_km,
|
|
duration_minutes=estimated_duration_minutes,
|
|
geometry=None,
|
|
)
|
|
|
|
def find_parks_along_route(
|
|
self, start_park: "Park", end_park: "Park", max_detour_km: float = 50
|
|
) -> List["Park"]:
|
|
"""
|
|
Find parks along a route within specified detour distance.
|
|
|
|
Args:
|
|
start_park: Starting park
|
|
end_park: Ending park
|
|
max_detour_km: Maximum detour distance in kilometers
|
|
|
|
Returns:
|
|
List of parks along the route
|
|
"""
|
|
from apps.parks.models import Park
|
|
|
|
if not hasattr(start_park, "location") or not hasattr(end_park, "location"):
|
|
return []
|
|
|
|
if not start_park.location or not end_park.location:
|
|
return []
|
|
|
|
start_coords = start_park.coordinates
|
|
end_coords = end_park.coordinates
|
|
|
|
if not start_coords or not end_coords:
|
|
return []
|
|
|
|
start_point = Point(start_coords[1], start_coords[0], srid=4326) # lon, lat
|
|
# end_point is not used in this method - we use coordinates directly
|
|
|
|
# Find all parks within a reasonable distance from both start and end
|
|
max_search_distance = Distance(km=max_detour_km * 2)
|
|
|
|
candidate_parks = (
|
|
Park.objects.filter(
|
|
location__point__distance_lte=(
|
|
start_point,
|
|
max_search_distance,
|
|
)
|
|
)
|
|
.exclude(id__in=[start_park.id, end_park.id])
|
|
.select_related("location")
|
|
)
|
|
|
|
parks_along_route = []
|
|
|
|
for park in candidate_parks:
|
|
if not park.location or not park.location.point:
|
|
continue
|
|
|
|
park_coords = park.coordinates
|
|
if not park_coords:
|
|
continue
|
|
|
|
# Calculate detour distance
|
|
detour_distance = self._calculate_detour_distance(
|
|
Coordinates(*start_coords),
|
|
Coordinates(*end_coords),
|
|
Coordinates(*park_coords),
|
|
)
|
|
|
|
if detour_distance and detour_distance <= max_detour_km:
|
|
parks_along_route.append(park)
|
|
|
|
return parks_along_route
|
|
|
|
def _calculate_detour_distance(
|
|
self, start: Coordinates, end: Coordinates, waypoint: Coordinates
|
|
) -> Optional[float]:
|
|
"""
|
|
Calculate the detour distance when visiting a waypoint.
|
|
"""
|
|
try:
|
|
# Direct route distance
|
|
direct_route = self.calculate_route(start, end)
|
|
if not direct_route:
|
|
return None
|
|
|
|
# Route via waypoint
|
|
route_to_waypoint = self.calculate_route(start, waypoint)
|
|
route_from_waypoint = self.calculate_route(waypoint, end)
|
|
|
|
if not route_to_waypoint or not route_from_waypoint:
|
|
return None
|
|
|
|
detour_distance = (
|
|
route_to_waypoint.distance_km + route_from_waypoint.distance_km
|
|
) - direct_route.distance_km
|
|
return max(0, detour_distance) # Don't return negative detours
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to calculate detour distance: {e}")
|
|
return None
|
|
|
|
def create_multi_park_trip(self, park_list: List["Park"]) -> Optional[RoadTrip]:
|
|
"""
|
|
Create optimized multi-park road trip using simple nearest neighbor heuristic.
|
|
|
|
Args:
|
|
park_list: List of parks to visit
|
|
|
|
Returns:
|
|
RoadTrip object with optimized route
|
|
"""
|
|
if len(park_list) < 2:
|
|
return None
|
|
|
|
# For small numbers of parks, try all permutations
|
|
if len(park_list) <= 6:
|
|
return self._optimize_trip_exhaustive(park_list)
|
|
else:
|
|
return self._optimize_trip_nearest_neighbor(park_list)
|
|
|
|
def _optimize_trip_exhaustive(self, park_list: List["Park"]) -> Optional[RoadTrip]:
|
|
"""
|
|
Find optimal route by testing all permutations (for small lists).
|
|
"""
|
|
best_trip = None
|
|
best_distance = float("inf")
|
|
|
|
# Try all possible orders (excluding the first park as starting point)
|
|
for perm in permutations(park_list[1:]):
|
|
ordered_parks = [park_list[0]] + list(perm)
|
|
trip = self._create_trip_from_order(ordered_parks)
|
|
|
|
if trip and trip.total_distance_km < best_distance:
|
|
best_distance = trip.total_distance_km
|
|
best_trip = trip
|
|
|
|
return best_trip
|
|
|
|
def _optimize_trip_nearest_neighbor(
|
|
self, park_list: List["Park"]
|
|
) -> Optional[RoadTrip]:
|
|
"""
|
|
Optimize trip using nearest neighbor heuristic (for larger lists).
|
|
"""
|
|
if not park_list:
|
|
return None
|
|
|
|
# Start with the first park
|
|
current_park = park_list[0]
|
|
ordered_parks = [current_park]
|
|
remaining_parks = park_list[1:]
|
|
|
|
while remaining_parks:
|
|
# Find nearest unvisited park
|
|
nearest_park = None
|
|
min_distance = float("inf")
|
|
|
|
current_coords = current_park.coordinates
|
|
if not current_coords:
|
|
break
|
|
|
|
for park in remaining_parks:
|
|
park_coords = park.coordinates
|
|
if not park_coords:
|
|
continue
|
|
|
|
route = self.calculate_route(
|
|
Coordinates(*current_coords), Coordinates(*park_coords)
|
|
)
|
|
|
|
if route and route.distance_km < min_distance:
|
|
min_distance = route.distance_km
|
|
nearest_park = park
|
|
|
|
if nearest_park:
|
|
ordered_parks.append(nearest_park)
|
|
remaining_parks.remove(nearest_park)
|
|
current_park = nearest_park
|
|
else:
|
|
break
|
|
|
|
return self._create_trip_from_order(ordered_parks)
|
|
|
|
def _create_trip_from_order(
|
|
self, ordered_parks: List["Park"]
|
|
) -> Optional[RoadTrip]:
|
|
"""
|
|
Create a RoadTrip object from an ordered list of parks.
|
|
"""
|
|
if len(ordered_parks) < 2:
|
|
return None
|
|
|
|
legs = []
|
|
total_distance = 0
|
|
total_duration = 0
|
|
|
|
for i in range(len(ordered_parks) - 1):
|
|
from_park = ordered_parks[i]
|
|
to_park = ordered_parks[i + 1]
|
|
|
|
from_coords = from_park.coordinates
|
|
to_coords = to_park.coordinates
|
|
|
|
if not from_coords or not to_coords:
|
|
continue
|
|
|
|
route = self.calculate_route(
|
|
Coordinates(*from_coords), Coordinates(*to_coords)
|
|
)
|
|
|
|
if route:
|
|
legs.append(TripLeg(from_park=from_park, to_park=to_park, route=route))
|
|
total_distance += route.distance_km
|
|
total_duration += route.duration_minutes
|
|
|
|
if not legs:
|
|
return None
|
|
|
|
return RoadTrip(
|
|
parks=ordered_parks,
|
|
legs=legs,
|
|
total_distance_km=total_distance,
|
|
total_duration_minutes=total_duration,
|
|
)
|
|
|
|
def get_park_distances(
|
|
self, center_park: "Park", radius_km: float = 100
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all parks within radius of a center park with distances.
|
|
|
|
Args:
|
|
center_park: Center park for search
|
|
radius_km: Search radius in kilometers
|
|
|
|
Returns:
|
|
List of dictionaries with park and distance information
|
|
"""
|
|
from apps.parks.models import Park
|
|
|
|
if not hasattr(center_park, "location") or not center_park.location:
|
|
return []
|
|
|
|
center_coords = center_park.coordinates
|
|
if not center_coords:
|
|
return []
|
|
|
|
center_point = Point(center_coords[1], center_coords[0], srid=4326) # lon, lat
|
|
search_distance = Distance(km=radius_km)
|
|
|
|
nearby_parks = (
|
|
Park.objects.filter(
|
|
location__point__distance_lte=(center_point, search_distance)
|
|
)
|
|
.exclude(id=center_park.id)
|
|
.select_related("location")
|
|
)
|
|
|
|
results = []
|
|
|
|
for park in nearby_parks:
|
|
park_coords = park.coordinates
|
|
if not park_coords:
|
|
continue
|
|
|
|
route = self.calculate_route(
|
|
Coordinates(*center_coords), Coordinates(*park_coords)
|
|
)
|
|
|
|
if route:
|
|
results.append(
|
|
{
|
|
"park": park,
|
|
"distance_km": route.distance_km,
|
|
"duration_minutes": route.duration_minutes,
|
|
"formatted_distance": route.formatted_distance,
|
|
"formatted_duration": route.formatted_duration,
|
|
}
|
|
)
|
|
|
|
# Sort by distance
|
|
results.sort(key=lambda x: x["distance_km"])
|
|
|
|
return results
|
|
|
|
def geocode_park_if_needed(self, park: "Park") -> bool:
|
|
"""
|
|
Geocode park location if coordinates are missing.
|
|
|
|
Args:
|
|
park: Park to geocode
|
|
|
|
Returns:
|
|
True if geocoding succeeded or wasn't needed, False otherwise
|
|
"""
|
|
if not hasattr(park, "location") or not park.location:
|
|
return False
|
|
|
|
location = park.location
|
|
|
|
# If we already have coordinates, no need to geocode
|
|
if location.point:
|
|
return True
|
|
|
|
# Build address string for geocoding
|
|
address_parts = [
|
|
park.name,
|
|
location.street_address,
|
|
location.city,
|
|
location.state,
|
|
location.country,
|
|
]
|
|
address = ", ".join(part for part in address_parts if part)
|
|
|
|
if not address:
|
|
return False
|
|
|
|
coords = self.geocode_address(address)
|
|
if coords:
|
|
location.set_coordinates(coords.latitude, coords.longitude)
|
|
location.save()
|
|
logger.info(
|
|
f"Geocoded park '{park.name}' to {coords.latitude}, {coords.longitude}"
|
|
)
|
|
return True
|
|
|
|
return False
|