""" Road Trip Service for theme park planning using OpenStreetMap APIs. This service provides functionality for: - Geocoding addresses using Nominatim - Route calculation using OSRM - Park discovery along routes - Multi-park trip planning - Proper rate limiting and caching """ import time import math import logging import requests from typing import Dict, List, Tuple, Optional, Any from dataclasses import dataclass from itertools import permutations from django.conf import settings from django.core.cache import cache from django.contrib.gis.geos import Point from django.contrib.gis.measure import Distance from apps.parks.models import Park logger = logging.getLogger(__name__) @dataclass class Coordinates: """Represents latitude and longitude coordinates.""" latitude: float longitude: float def to_tuple(self) -> Tuple[float, float]: """Return as (lat, lon) tuple.""" return (self.latitude, self.longitude) def to_point(self) -> Point: """Convert to Django Point object.""" return Point(self.longitude, self.latitude, srid=4326) @dataclass class RouteInfo: """Information about a calculated route.""" distance_km: float duration_minutes: int geometry: Optional[str] = None # Encoded polyline @property def formatted_distance(self) -> str: """Return formatted distance string.""" if self.distance_km < 1: return f"{self.distance_km * 1000:.0f}m" return f"{self.distance_km:.1f}km" @property def formatted_duration(self) -> str: """Return formatted duration string.""" hours = self.duration_minutes // 60 minutes = self.duration_minutes % 60 if hours == 0: return f"{minutes}min" elif minutes == 0: return f"{hours}h" else: return f"{hours}h {minutes}min" @dataclass class TripLeg: """Represents one leg of a multi-park trip.""" from_park: "Park" to_park: "Park" route: RouteInfo @property def parks_along_route(self) -> List["Park"]: """Get parks along this route segment.""" # This would be populated by find_parks_along_route return [] @dataclass class RoadTrip: """Complete road trip with multiple parks.""" parks: List["Park"] legs: List[TripLeg] total_distance_km: float total_duration_minutes: int @property def formatted_total_distance(self) -> str: """Return formatted total distance.""" return f"{self.total_distance_km:.1f}km" @property def formatted_total_duration(self) -> str: """Return formatted total duration.""" hours = self.total_duration_minutes // 60 minutes = self.total_duration_minutes % 60 if hours == 0: return f"{minutes}min" elif minutes == 0: return f"{hours}h" else: return f"{hours}h {minutes}min" class RateLimiter: """Simple rate limiter for API requests.""" def __init__(self, max_requests_per_second: float = 1.0): self.max_requests_per_second = max_requests_per_second self.min_interval = 1.0 / max_requests_per_second self.last_request_time = 0.0 def wait_if_needed(self): """Wait if necessary to respect rate limits.""" current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < self.min_interval: wait_time = self.min_interval - time_since_last time.sleep(wait_time) self.last_request_time = time.time() class OSMAPIException(Exception): """Exception for OSM API related errors.""" class RoadTripService: """ Service for planning road trips between theme parks using OpenStreetMap APIs. """ def __init__(self): self.nominatim_base_url = "https://nominatim.openstreetmap.org" self.osrm_base_url = "http://router.project-osrm.org/route/v1/driving" # Configuration from Django settings self.cache_timeout = getattr(settings, "ROADTRIP_CACHE_TIMEOUT", 3600 * 24) self.route_cache_timeout = getattr( settings, "ROADTRIP_ROUTE_CACHE_TIMEOUT", 3600 * 6 ) self.user_agent = getattr( settings, "ROADTRIP_USER_AGENT", "ThrillWiki Road Trip Planner" ) self.request_timeout = getattr(settings, "ROADTRIP_REQUEST_TIMEOUT", 10) self.max_retries = getattr(settings, "ROADTRIP_MAX_RETRIES", 3) self.backoff_factor = getattr(settings, "ROADTRIP_BACKOFF_FACTOR", 2) # Rate limiter max_rps = getattr(settings, "ROADTRIP_MAX_REQUESTS_PER_SECOND", 1) self.rate_limiter = RateLimiter(max_rps) # Request session with proper headers self.session = requests.Session() self.session.headers.update( { "User-Agent": self.user_agent, "Accept": "application/json", } ) def _make_request(self, url: str, params: Dict[str, Any]) -> Dict[str, Any]: """ Make HTTP request with rate limiting, retries, and error handling. """ self.rate_limiter.wait_if_needed() for attempt in range(self.max_retries): try: response = self.session.get( url, params=params, timeout=self.request_timeout ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.warning(f"Request attempt {attempt + 1} failed: {e}") if attempt < self.max_retries - 1: wait_time = self.backoff_factor**attempt time.sleep(wait_time) else: raise OSMAPIException( f"Failed to make request after { self.max_retries} attempts: {e}" ) def geocode_address(self, address: str) -> Optional[Coordinates]: """ Convert address to coordinates using Nominatim geocoding service. Args: address: Address string to geocode Returns: Coordinates object or None if geocoding fails """ if not address or not address.strip(): return None # Check cache first cache_key = f"roadtrip:geocode:{hash(address.lower().strip())}" cached_result = cache.get(cache_key) if cached_result: return Coordinates(**cached_result) try: params = { "q": address.strip(), "format": "json", "limit": 1, "addressdetails": 1, } url = f"{self.nominatim_base_url}/search" response = self._make_request(url, params) if response and len(response) > 0: result = response[0] coords = Coordinates( latitude=float(result["lat"]), longitude=float(result["lon"]), ) # Cache the result cache.set( cache_key, { "latitude": coords.latitude, "longitude": coords.longitude, }, self.cache_timeout, ) logger.info( f"Geocoded '{address}' to { coords.latitude}, { coords.longitude}" ) return coords else: logger.warning(f"No geocoding results for address: {address}") return None except Exception as e: logger.error(f"Geocoding failed for '{address}': {e}") return None def calculate_route( self, start_coords: Coordinates, end_coords: Coordinates ) -> Optional[RouteInfo]: """ Calculate route between two coordinate points using OSRM. Args: start_coords: Starting coordinates end_coords: Ending coordinates Returns: RouteInfo object or None if routing fails """ if not start_coords or not end_coords: return None # Check cache first cache_key = f"roadtrip:route:{ start_coords.latitude},{ start_coords.longitude}:{ end_coords.latitude},{ end_coords.longitude}" cached_result = cache.get(cache_key) if cached_result: return RouteInfo(**cached_result) try: # Format coordinates for OSRM (lon,lat format) coords_string = f"{ start_coords.longitude},{ start_coords.latitude};{ end_coords.longitude},{ end_coords.latitude}" url = f"{self.osrm_base_url}/{coords_string}" params = { "overview": "full", "geometries": "polyline", "steps": "false", } response = self._make_request(url, params) if response.get("code") == "Ok" and response.get("routes"): route_data = response["routes"][0] # Distance is in meters, convert to km distance_km = route_data["distance"] / 1000.0 # Duration is in seconds, convert to minutes duration_minutes = int(route_data["duration"] / 60) route_info = RouteInfo( distance_km=distance_km, duration_minutes=duration_minutes, geometry=route_data.get("geometry"), ) # Cache the result cache.set( cache_key, { "distance_km": route_info.distance_km, "duration_minutes": route_info.duration_minutes, "geometry": route_info.geometry, }, self.route_cache_timeout, ) logger.info( f"Route calculated: { route_info.formatted_distance}, { route_info.formatted_duration}" ) return route_info else: # Fallback to straight-line distance calculation logger.warning( "OSRM routing failed, falling back to straight-line distance" ) return self._calculate_straight_line_route(start_coords, end_coords) except Exception as e: logger.error(f"Route calculation failed: {e}") # Fallback to straight-line distance return self._calculate_straight_line_route(start_coords, end_coords) def _calculate_straight_line_route( self, start_coords: Coordinates, end_coords: Coordinates ) -> RouteInfo: """ Calculate straight-line distance as fallback when routing fails. """ # Haversine formula for great-circle distance lat1, lon1 = math.radians(start_coords.latitude), math.radians( start_coords.longitude ) lat2, lon2 = math.radians(end_coords.latitude), math.radians( end_coords.longitude ) dlat = lat2 - lat1 dlon = lon2 - lon1 a = ( math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2 ) c = 2 * math.asin(math.sqrt(a)) # Earth's radius in kilometers earth_radius_km = 6371.0 distance_km = earth_radius_km * c # Estimate driving time (assume average 80 km/h with 25% extra for # roads) estimated_duration_minutes = int((distance_km * 1.25 / 80.0) * 60) return RouteInfo( distance_km=distance_km, duration_minutes=estimated_duration_minutes, geometry=None, ) def find_parks_along_route( self, start_park: "Park", end_park: "Park", max_detour_km: float = 50 ) -> List["Park"]: """ Find parks along a route within specified detour distance. Args: start_park: Starting park end_park: Ending park max_detour_km: Maximum detour distance in kilometers Returns: List of parks along the route """ from apps.parks.models import Park if not hasattr(start_park, "location") or not hasattr(end_park, "location"): return [] if not start_park.location or not end_park.location: return [] start_coords = start_park.coordinates end_coords = end_park.coordinates if not start_coords or not end_coords: return [] start_point = Point(start_coords[1], start_coords[0], srid=4326) # lon, lat # end_point is not used in this method - we use coordinates directly # Find all parks within a reasonable distance from both start and end max_search_distance = Distance(km=max_detour_km * 2) candidate_parks = ( Park.objects.filter( location__point__distance_lte=( start_point, max_search_distance, ) ) .exclude(id__in=[start_park.id, end_park.id]) .select_related("location") ) parks_along_route = [] for park in candidate_parks: if not park.location or not park.location.point: continue park_coords = park.coordinates if not park_coords: continue # Calculate detour distance detour_distance = self._calculate_detour_distance( Coordinates(*start_coords), Coordinates(*end_coords), Coordinates(*park_coords), ) if detour_distance and detour_distance <= max_detour_km: parks_along_route.append(park) return parks_along_route def _calculate_detour_distance( self, start: Coordinates, end: Coordinates, waypoint: Coordinates ) -> Optional[float]: """ Calculate the detour distance when visiting a waypoint. """ try: # Direct route distance direct_route = self.calculate_route(start, end) if not direct_route: return None # Route via waypoint route_to_waypoint = self.calculate_route(start, waypoint) route_from_waypoint = self.calculate_route(waypoint, end) if not route_to_waypoint or not route_from_waypoint: return None detour_distance = ( route_to_waypoint.distance_km + route_from_waypoint.distance_km ) - direct_route.distance_km return max(0, detour_distance) # Don't return negative detours except Exception as e: logger.error(f"Failed to calculate detour distance: {e}") return None def create_multi_park_trip(self, park_list: List["Park"]) -> Optional[RoadTrip]: """ Create optimized multi-park road trip using simple nearest neighbor heuristic. Args: park_list: List of parks to visit Returns: RoadTrip object with optimized route """ if len(park_list) < 2: return None # For small numbers of parks, try all permutations if len(park_list) <= 6: return self._optimize_trip_exhaustive(park_list) else: return self._optimize_trip_nearest_neighbor(park_list) def _optimize_trip_exhaustive(self, park_list: List["Park"]) -> Optional[RoadTrip]: """ Find optimal route by testing all permutations (for small lists). """ best_trip = None best_distance = float("inf") # Try all possible orders (excluding the first park as starting point) for perm in permutations(park_list[1:]): ordered_parks = [park_list[0]] + list(perm) trip = self._create_trip_from_order(ordered_parks) if trip and trip.total_distance_km < best_distance: best_distance = trip.total_distance_km best_trip = trip return best_trip def _optimize_trip_nearest_neighbor( self, park_list: List["Park"] ) -> Optional[RoadTrip]: """ Optimize trip using nearest neighbor heuristic (for larger lists). """ if not park_list: return None # Start with the first park current_park = park_list[0] ordered_parks = [current_park] remaining_parks = park_list[1:] while remaining_parks: # Find nearest unvisited park nearest_park = None min_distance = float("inf") current_coords = current_park.coordinates if not current_coords: break for park in remaining_parks: park_coords = park.coordinates if not park_coords: continue route = self.calculate_route( Coordinates(*current_coords), Coordinates(*park_coords) ) if route and route.distance_km < min_distance: min_distance = route.distance_km nearest_park = park if nearest_park: ordered_parks.append(nearest_park) remaining_parks.remove(nearest_park) current_park = nearest_park else: break return self._create_trip_from_order(ordered_parks) def _create_trip_from_order( self, ordered_parks: List["Park"] ) -> Optional[RoadTrip]: """ Create a RoadTrip object from an ordered list of parks. """ if len(ordered_parks) < 2: return None legs = [] total_distance = 0 total_duration = 0 for i in range(len(ordered_parks) - 1): from_park = ordered_parks[i] to_park = ordered_parks[i + 1] from_coords = from_park.coordinates to_coords = to_park.coordinates if not from_coords or not to_coords: continue route = self.calculate_route( Coordinates(*from_coords), Coordinates(*to_coords) ) if route: legs.append(TripLeg(from_park=from_park, to_park=to_park, route=route)) total_distance += route.distance_km total_duration += route.duration_minutes if not legs: return None return RoadTrip( parks=ordered_parks, legs=legs, total_distance_km=total_distance, total_duration_minutes=total_duration, ) def get_park_distances( self, center_park: "Park", radius_km: float = 100 ) -> List[Dict[str, Any]]: """ Get all parks within radius of a center park with distances. Args: center_park: Center park for search radius_km: Search radius in kilometers Returns: List of dictionaries with park and distance information """ from apps.parks.models import Park if not hasattr(center_park, "location") or not center_park.location: return [] center_coords = center_park.coordinates if not center_coords: return [] center_point = Point(center_coords[1], center_coords[0], srid=4326) # lon, lat search_distance = Distance(km=radius_km) nearby_parks = ( Park.objects.filter( location__point__distance_lte=(center_point, search_distance) ) .exclude(id=center_park.id) .select_related("location") ) results = [] for park in nearby_parks: park_coords = park.coordinates if not park_coords: continue route = self.calculate_route( Coordinates(*center_coords), Coordinates(*park_coords) ) if route: results.append( { "park": park, "distance_km": route.distance_km, "duration_minutes": route.duration_minutes, "formatted_distance": route.formatted_distance, "formatted_duration": route.formatted_duration, } ) # Sort by distance results.sort(key=lambda x: x["distance_km"]) return results def geocode_park_if_needed(self, park: "Park") -> bool: """ Geocode park location if coordinates are missing. Args: park: Park to geocode Returns: True if geocoding succeeded or wasn't needed, False otherwise """ if not hasattr(park, "location") or not park.location: return False location = park.location # If we already have coordinates, no need to geocode if location.point: return True # Build address string for geocoding address_parts = [ park.name, location.street_address, location.city, location.state, location.country, ] address = ", ".join(part for part in address_parts if part) if not address: return False coords = self.geocode_address(address) if coords: location.set_coordinates(coords.latitude, coords.longitude) location.save() logger.info( f"Geocoded park '{ park.name}' to { coords.latitude}, { coords.longitude}" ) return True return False