from .querysets import get_base_park_queryset from apps.core.mixins import HTMXFilterableMixin from .models.location import ParkLocation from .models.media import ParkPhoto from apps.moderation.services import ModerationService from apps.moderation.mixins import ( EditSubmissionMixin, PhotoSubmissionMixin, HistoryMixin, ) from apps.core.views.views import SlugRedirectMixin from .filters import ParkFilter from .forms import ParkForm from .models import Park, ParkArea, ParkReview as Review from .services import ParkFilterService from django.http import ( HttpResponseRedirect, HttpResponse, HttpRequest, JsonResponse, ) from django.core.exceptions import ObjectDoesNotExist from django.contrib import messages from django.contrib.contenttypes.models import ContentType from django.contrib.auth.mixins import LoginRequiredMixin from django.db.models import QuerySet from django.urls import reverse from django.shortcuts import get_object_or_404, render from decimal import InvalidOperation from django.views.generic import DetailView, ListView, CreateView, UpdateView import requests from decimal import Decimal, ROUND_DOWN from typing import Any, Optional, cast, Literal, Dict # Constants PARK_DETAIL_URL = "parks:park_detail" PARK_LIST_ITEM_TEMPLATE = "parks/partials/park_list_item.html" REQUIRED_FIELDS_ERROR = ( "Please correct the errors below. Required fields are marked with an asterisk (*)." ) ALLOWED_ROLES = ["MODERATOR", "ADMIN", "SUPERUSER"] ViewMode = Literal["grid", "list"] def normalize_osm_result(result: dict) -> dict: """Normalize OpenStreetMap result to a consistent format with enhanced address details""" # noqa: E501 from .location_utils import get_english_name, normalize_coordinate # Get address details address = result.get("address", {}) # Normalize coordinates lat = normalize_coordinate(float(result.get("lat")), 9, 6) lon = normalize_coordinate(float(result.get("lon")), 10, 6) # Get English names where possible name = "" if "namedetails" in result: name = get_english_name(result["namedetails"]) # Build street address from available components street_parts = [] if address.get("house_number"): street_parts.append(address["house_number"]) if address.get("road") or address.get("street"): street_parts.append(address.get("road") or address.get("street")) elif address.get("pedestrian"): street_parts.append(address["pedestrian"]) elif address.get("footway"): street_parts.append(address["footway"]) # Handle additional address components suburb = address.get("suburb", "") district = address.get("district", "") neighborhood = address.get("neighbourhood", "") # Build city from available components city = ( address.get("city") or address.get("town") or address.get("village") or address.get("municipality") or "" ) # Get detailed state/region information state = ( address.get("state") or address.get("province") or address.get("region") or "" ) # Get postal code with fallbacks postal_code = address.get("postcode") or address.get("postal_code") or "" return { "display_name": name or result.get("display_name", ""), "lat": lat, "lon": lon, "street": " ".join(street_parts).strip(), "suburb": suburb, "district": district, "neighborhood": neighborhood, "city": city, "state": state, "country": address.get("country", ""), "postal_code": postal_code, } def get_view_mode(request: HttpRequest) -> ViewMode: """Get the current view mode from request, defaulting to grid""" view_mode = request.GET.get("view_mode", "grid") return cast(ViewMode, "list" if view_mode == "list" else "grid") def add_park_button(request: HttpRequest) -> HttpResponse: """Return the add park button partial template""" return render(request, "parks/partials/add_park_button.html") def park_actions(request: HttpRequest, slug: str) -> HttpResponse: """Return the park actions partial template""" park = get_object_or_404(Park, slug=slug) return render(request, "parks/partials/park_actions.html", {"park": park}) def get_park_areas(request: HttpRequest) -> HttpResponse: """Return park areas as options for a select element""" park_id = request.GET.get("park") if not park_id: return HttpResponse('') try: park = Park.objects.get(id=park_id) areas = park.areas.all() options = [''] options.extend( [f'' for area in areas] ) return HttpResponse("\n".join(options)) except Park.DoesNotExist: return HttpResponse('') def location_search(request: HttpRequest) -> JsonResponse: """Search for locations using OpenStreetMap Nominatim API""" query = request.GET.get("q", "") if not query: return JsonResponse({"results": []}) response = requests.get( "https://nominatim.openstreetmap.org/search", params={ "q": query, "format": "json", "addressdetails": 1, "namedetails": 1, "accept-language": "en", "limit": 10, }, headers={"User-Agent": "ThrillWiki/1.0"}, timeout=60, ) if response.status_code == 200: results = response.json() normalized_results = [normalize_osm_result(result) for result in results] valid_results = [ r for r in normalized_results if r["lat"] is not None and r["lon"] is not None ] return JsonResponse({"results": valid_results}) return JsonResponse({"results": []}) def reverse_geocode(request: HttpRequest) -> JsonResponse: """Reverse geocode coordinates using OpenStreetMap Nominatim API""" try: lat = Decimal(request.GET.get("lat", "")) lon = Decimal(request.GET.get("lon", "")) except (TypeError, ValueError, InvalidOperation): return JsonResponse({"error": "Invalid coordinates"}, status=400) if not lat or not lon: return JsonResponse({"error": "Missing coordinates"}, status=400) lat = lat.quantize(Decimal("0.000001"), rounding=ROUND_DOWN) lon = lon.quantize(Decimal("0.000001"), rounding=ROUND_DOWN) if lat < -90 or lat > 90: return JsonResponse( {"error": "Latitude must be between -90 and 90"}, status=400 ) if lon < -180 or lon > 180: return JsonResponse( {"error": "Longitude must be between -180 and 180"}, status=400 ) response = requests.get( "https://nominatim.openstreetmap.org/reverse", params={ "lat": str(lat), "lon": str(lon), "format": "json", "addressdetails": 1, "namedetails": 1, "accept-language": "en", }, headers={"User-Agent": "ThrillWiki/1.0"}, timeout=60, ) if response.status_code == 200: result = response.json() normalized_result = normalize_osm_result(result) if normalized_result["lat"] is None or normalized_result["lon"] is None: return JsonResponse({"error": "Invalid coordinates"}, status=400) return JsonResponse(normalized_result) return JsonResponse({"error": "Geocoding failed"}, status=500) class ParkListView(HTMXFilterableMixin, ListView): model = Park template_name = "parks/park_list.html" context_object_name = "parks" filter_class = ParkFilter paginate_by = 20 def __init__(self, **kwargs): super().__init__(**kwargs) self.filter_service = ParkFilterService() def get_template_names(self) -> list[str]: """Return park_list.html for HTMX requests""" if self.request.htmx: return ["parks/partials/park_list.html"] return [self.template_name] def get_view_mode(self) -> ViewMode: """Get the current view mode (grid or list)""" return get_view_mode(self.request) def get_queryset(self) -> QuerySet[Park]: """Get optimized queryset with enhanced filtering and proper relations""" try: # Start with optimized base queryset queryset = ( get_base_park_queryset() .select_related( 'operator', 'property_owner', 'location', 'banner_image', 'card_image' ) .prefetch_related( 'photos', 'rides__manufacturer', 'areas' ) ) # Use filter service for enhanced filtering filter_params = self._get_clean_filter_params() # Apply ordering ordering = self.request.GET.get('ordering', 'name') if ordering: # Validate ordering to prevent SQL injection valid_orderings = [ 'name', '-name', 'average_rating', '-average_rating', 'coaster_count', '-coaster_count', 'ride_count', '-ride_count', 'opening_date', '-opening_date' ] if ordering in valid_orderings: queryset = queryset.order_by(ordering) else: queryset = queryset.order_by('name') # Default fallback # Apply other filters through service filtered_queryset = self.filter_service.get_filtered_queryset(filter_params) # Combine with optimized queryset maintaining the optimizations final_queryset = queryset.filter( pk__in=filtered_queryset.values_list('pk', flat=True) ) # Create filterset for form rendering self.filterset = self.filter_class(self.request.GET, queryset=final_queryset) return self.filterset.qs except Exception as e: messages.error(self.request, f"Error loading parks: {str(e)}") queryset = self.model.objects.none() self.filterset = self.filter_class(self.request.GET, queryset=queryset) return queryset def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Add enhanced context with filter stats and suggestions""" try: # Initialize filterset if not exists if not hasattr(self, "filterset"): self.filterset = self.filter_class( self.request.GET, queryset=self.model.objects.none() ) context = super().get_context_data(**kwargs) # Add filter service data filter_counts = self.filter_service.get_filter_counts() popular_filters = self.filter_service.get_popular_filters() # Calculate active filters for chips component active_filters = {} for key, value in self.request.GET.items(): if key not in ['page', 'view_mode'] and value: active_filters[key] = value context.update( { "view_mode": self.get_view_mode(), "is_search": bool(self.request.GET.get("search")), "search_query": self.request.GET.get("search", ""), "filter_counts": filter_counts, "popular_filters": popular_filters, "active_filters": active_filters, "filter_count": len(active_filters), "current_ordering": self.request.GET.get("ordering", "name"), "total_results": ( context.get("paginator").count if context.get("paginator") else 0 ), } ) # Add filter suggestions for search queries search_query = self.request.GET.get("search", "") if search_query: context["filter_suggestions"] = ( self.filter_service.get_filter_suggestions(search_query) ) return context except Exception as e: messages.error(self.request, f"Error applying filters: {str(e)}") # Ensure filterset exists in error case if not hasattr(self, "filterset"): self.filterset = self.filter_class( self.request.GET, queryset=self.model.objects.none() ) return { "filter": self.filterset, "error": "Unable to apply filters. Please try adjusting your criteria.", "view_mode": self.get_view_mode(), "is_search": bool(self.request.GET.get("search")), "search_query": self.request.GET.get("search", ""), } def _get_clean_filter_params(self) -> Dict[str, Any]: """Extract and clean filter parameters from request.""" filter_params = {} # Define valid filter fields valid_filters = { "status", "operator", "park_type", "has_coasters", "min_rating", "big_parks_only", "ordering", "search", } for param, value in self.request.GET.items(): if param in valid_filters and value: # Skip pagination parameter if param == "page": continue # Clean and validate the value filter_params[param] = self._clean_filter_value(param, value) return {k: v for k, v in filter_params.items() if v is not None} def _clean_filter_value(self, param: str, value: str) -> Optional[Any]: """Clean and validate a single filter value.""" if param in ("has_coasters", "big_parks_only"): # Boolean filters return value.lower() in ("true", "1", "yes", "on") elif param == "min_rating": # Numeric filter try: rating = float(value) if 0 <= rating <= 5: return str(rating) except (ValueError, TypeError): pass # Skip invalid ratings return None elif param == "search": # Search filter clean_search = value.strip() return clean_search if clean_search else None else: # String filters return value.strip() def _build_filter_query_string(self, filter_params: Dict[str, Any]) -> str: """Build query string from filter parameters.""" from urllib.parse import urlencode # Convert boolean values to strings for URL url_params = {} for key, value in filter_params.items(): if isinstance(value, bool): url_params[key] = "true" if value else "false" else: url_params[key] = str(value) return urlencode(url_params) def _get_pagination_urls( self, page_obj, filter_params: Dict[str, Any] ) -> Dict[str, str]: """Generate pagination URLs that preserve filter state.""" base_query = self._build_filter_query_string(filter_params) pagination_urls = {} if page_obj.has_previous(): prev_params = ( f"{base_query}&page={page_obj.previous_page_number()}" if base_query else f"page={page_obj.previous_page_number()}" ) pagination_urls["previous_url"] = f"?{prev_params}" if page_obj.has_next(): next_params = ( f"{base_query}&page={page_obj.next_page_number()}" if base_query else f"page={page_obj.next_page_number()}" ) pagination_urls["next_url"] = f"?{next_params}" # First and last page URLs if page_obj.number > 1: first_params = f"{base_query}&page=1" if base_query else "page=1" pagination_urls["first_url"] = f"?{first_params}" if page_obj.number < page_obj.paginator.num_pages: last_params = ( f"{base_query}&page={page_obj.paginator.num_pages}" if base_query else f"page={page_obj.paginator.num_pages}" ) pagination_urls["last_url"] = f"?{last_params}" return pagination_urls def search_parks(request: HttpRequest) -> HttpResponse: """Search parks and return results using park_list_item.html""" try: search_query = request.GET.get("search", "").strip() if not search_query: return HttpResponse("") # Get current view mode from request current_view_mode = request.GET.get("view_mode", "grid") park_filter = ParkFilter( {"search": search_query}, queryset=get_base_park_queryset() ) parks = park_filter.qs if request.GET.get("quick_search"): parks = parks[:8] # Limit quick search results response = render( request, PARK_LIST_ITEM_TEMPLATE, { "parks": parks, "view_mode": current_view_mode, "search_query": search_query, "is_search": True, }, ) response["HX-Trigger"] = "searchComplete" return response except Exception as e: response = render( request, PARK_LIST_ITEM_TEMPLATE, { "parks": [], "error": f"Error performing search: {str(e)}", "is_search": True, }, ) response["HX-Trigger"] = "searchError" return response class ParkCreateView(LoginRequiredMixin, CreateView): model = Park form_class = ParkForm template_name = "parks/park_form.html" def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]: data = cleaned_data.copy() if data.get("owner"): data["owner"] = data["owner"].id if data.get("opening_date"): data["opening_date"] = data["opening_date"].isoformat() if data.get("closing_date"): data["closing_date"] = data["closing_date"].isoformat() decimal_fields = [ "latitude", "longitude", "size_acres", "average_rating", ] for field in decimal_fields: if data.get(field): data[field] = str(data[field]) return data def normalize_coordinates(self, form: ParkForm) -> None: if form.cleaned_data.get("latitude"): lat = Decimal(str(form.cleaned_data["latitude"])) form.cleaned_data["latitude"] = lat.quantize( Decimal("0.000001"), rounding=ROUND_DOWN ) if form.cleaned_data.get("longitude"): lon = Decimal(str(form.cleaned_data["longitude"])) form.cleaned_data["longitude"] = lon.quantize( Decimal("0.000001"), rounding=ROUND_DOWN ) def form_valid(self, form: ParkForm) -> HttpResponse: self.normalize_coordinates(form) changes = self.prepare_changes_data(form.cleaned_data) # Use the new queue routing service result = ModerationService.create_edit_submission_with_queue( content_object=None, # None for CREATE changes=changes, submitter=self.request.user, submission_type="CREATE", reason=self.request.POST.get("reason", ""), source=self.request.POST.get("source", ""), ) if result['status'] == 'auto_approved': # Moderator submission was auto-approved self.object = result['created_object'] if form.cleaned_data.get("latitude") and form.cleaned_data.get("longitude"): # Create or update ParkLocation park_location, created = ParkLocation.objects.get_or_create( park=self.object, defaults={ "street_address": form.cleaned_data.get("street_address", ""), "city": form.cleaned_data.get("city", ""), "state": form.cleaned_data.get("state", ""), "country": form.cleaned_data.get("country", "USA"), "postal_code": form.cleaned_data.get("postal_code", ""), }, ) park_location.set_coordinates( form.cleaned_data["latitude"], form.cleaned_data["longitude"], ) park_location.save() photos = self.request.FILES.getlist("photos") uploaded_count = 0 for photo_file in photos: try: ParkPhoto.objects.create( image=photo_file, uploaded_by=self.request.user, park=self.object, ) uploaded_count += 1 except Exception as e: messages.error( self.request, f"Error uploading photo {photo_file.name}: {str(e)}", ) messages.success( self.request, f"Successfully created {self.object.name}. " f"Added {uploaded_count} photo(s).", ) return HttpResponseRedirect(self.get_success_url()) elif result['status'] == 'queued': # Regular user submission was queued messages.success( self.request, "Your park submission has been sent for review. " "You will be notified when it is approved.", ) # Redirect to parks list since we don't have an object yet return HttpResponseRedirect(reverse("parks:park_list")) elif result['status'] == 'failed': # Auto-approval failed messages.error( self.request, f"Error creating park: {result['message']}. Please check your input and try again.", ) return self.form_invalid(form) # Fallback error case messages.error( self.request, "An unexpected error occurred. Please try again.", ) return self.form_invalid(form) def get_success_url(self) -> str: return reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug}) class ParkUpdateView(LoginRequiredMixin, UpdateView): model = Park form_class = ParkForm template_name = "parks/park_form.html" def get_context_data(self, **kwargs: Any) -> dict[str, Any]: context = super().get_context_data(**kwargs) context["is_edit"] = True return context def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]: data = cleaned_data.copy() if data.get("owner"): data["owner"] = data["owner"].id if data.get("opening_date"): data["opening_date"] = data["opening_date"].isoformat() if data.get("closing_date"): data["closing_date"] = data["closing_date"].isoformat() decimal_fields = [ "latitude", "longitude", "size_acres", "average_rating", ] for field in decimal_fields: if data.get(field): data[field] = str(data[field]) return data def normalize_coordinates(self, form: ParkForm) -> None: if form.cleaned_data.get("latitude"): lat = Decimal(str(form.cleaned_data["latitude"])) form.cleaned_data["latitude"] = lat.quantize( Decimal("0.000001"), rounding=ROUND_DOWN ) if form.cleaned_data.get("longitude"): lon = Decimal(str(form.cleaned_data["longitude"])) form.cleaned_data["longitude"] = lon.quantize( Decimal("0.000001"), rounding=ROUND_DOWN ) def form_valid(self, form: ParkForm) -> HttpResponse: # noqa: C901 self.normalize_coordinates(form) changes = self.prepare_changes_data(form.cleaned_data) # Use the new queue routing service result = ModerationService.create_edit_submission_with_queue( content_object=self.object, changes=changes, submitter=self.request.user, submission_type="EDIT", reason=self.request.POST.get("reason", ""), source=self.request.POST.get("source", ""), ) if result['status'] == 'auto_approved': # Moderator submission was auto-approved # The object was already updated by the service self.object = result['created_object'] location_data = { "name": self.object.name, "location_type": "park", "latitude": form.cleaned_data.get("latitude"), "longitude": form.cleaned_data.get("longitude"), "street_address": form.cleaned_data.get("street_address", ""), "city": form.cleaned_data.get("city", ""), "state": form.cleaned_data.get("state", ""), "country": form.cleaned_data.get("country", ""), "postal_code": form.cleaned_data.get("postal_code", ""), } # Create or update ParkLocation try: park_location = self.object.location # Update existing location for key, value in location_data.items(): if key in ["latitude", "longitude"] and value: continue # Handle coordinates separately if hasattr(park_location, key): setattr(park_location, key, value) # Handle coordinates if provided if "latitude" in location_data and "longitude" in location_data: if location_data["latitude"] and location_data["longitude"]: park_location.set_coordinates( float(location_data["latitude"]), float(location_data["longitude"]), ) park_location.save() except ParkLocation.DoesNotExist: # Create new ParkLocation coordinates_data = {} if "latitude" in location_data and "longitude" in location_data: if location_data["latitude"] and location_data["longitude"]: coordinates_data = { "latitude": float(location_data["latitude"]), "longitude": float(location_data["longitude"]), } # Remove coordinate fields from location_data for creation creation_data = { k: v for k, v in location_data.items() if k not in ["latitude", "longitude"] } creation_data.setdefault("country", "USA") park_location = ParkLocation.objects.create( park=self.object, **creation_data ) if coordinates_data: park_location.set_coordinates( coordinates_data["latitude"], coordinates_data["longitude"], ) park_location.save() photos = self.request.FILES.getlist("photos") uploaded_count = 0 for photo_file in photos: try: ParkPhoto.objects.create( image=photo_file, uploaded_by=self.request.user, content_type=ContentType.objects.get_for_model(Park), object_id=self.object.id, ) uploaded_count += 1 except Exception as e: messages.error( self.request, f"Error uploading photo {photo_file.name}: {str(e)}", ) messages.success( self.request, f"Successfully updated {self.object.name}. " f"Added {uploaded_count} new photo(s).", ) return HttpResponseRedirect(self.get_success_url()) elif result['status'] == 'queued': # Regular user submission was queued messages.success( self.request, f"Your changes to {self.object.name} have been sent for review. " "You will be notified when they are approved.", ) return HttpResponseRedirect( reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug}) ) elif result['status'] == 'failed': # Auto-approval failed messages.error( self.request, f"Error updating park: {result['message']}. Please check your input and try again.", ) return self.form_invalid(form) # Fallback error case messages.error( self.request, "An unexpected error occurred. Please try again.", ) return self.form_invalid(form) def form_invalid(self, form: ParkForm) -> HttpResponse: messages.error(self.request, REQUIRED_FIELDS_ERROR) for field, errors in form.errors.items(): for error in errors: messages.error(self.request, f"{field}: {error}") return super().form_invalid(form) def get_success_url(self) -> str: return reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug}) class ParkDetailView( SlugRedirectMixin, EditSubmissionMixin, PhotoSubmissionMixin, HistoryMixin, DetailView, ): model = Park template_name = "parks/park_detail.html" context_object_name = "park" def get_object(self, queryset: Optional[QuerySet[Park]] = None) -> Park: if queryset is None: queryset = self.get_queryset() slug = self.kwargs.get(self.slug_url_kwarg) if slug is None: raise ObjectDoesNotExist("No slug provided") park, _ = Park.get_by_slug(slug) return park def get_queryset(self) -> QuerySet[Park]: return cast( QuerySet[Park], super() .get_queryset() .prefetch_related( "rides", "rides__manufacturer", "photos", "areas", "location" ), ) def get_context_data(self, **kwargs: Any) -> dict[str, Any]: context = super().get_context_data(**kwargs) park = cast(Park, self.object) context["areas"] = park.areas.all() context["rides"] = park.rides.all().order_by("-status", "name") if self.request.user.is_authenticated: context["has_reviewed"] = Review.objects.filter( user=self.request.user, park=park, ).exists() else: context["has_reviewed"] = False return context def get_redirect_url_pattern(self) -> str: return PARK_DETAIL_URL class ParkAreaDetailView( SlugRedirectMixin, EditSubmissionMixin, PhotoSubmissionMixin, HistoryMixin, DetailView, ): model = ParkArea template_name = "parks/area_detail.html" context_object_name = "area" slug_url_kwarg = "area_slug" def get_object(self, queryset: Optional[QuerySet[ParkArea]] = None) -> ParkArea: if queryset is None: queryset = self.get_queryset() park_slug = self.kwargs.get("park_slug") area_slug = self.kwargs.get("area_slug") if park_slug is None or area_slug is None: raise ObjectDoesNotExist("Missing slug") area, _ = ParkArea.get_by_slug(area_slug) if area.park.slug != park_slug: raise ObjectDoesNotExist("Park slug doesn't match") return area def get_context_data(self, **kwargs: Any) -> dict[str, Any]: context = super().get_context_data(**kwargs) return context def get_redirect_url_pattern(self) -> str: return PARK_DETAIL_URL def get_redirect_url_kwargs(self) -> dict[str, str]: area = cast(ParkArea, self.object) return {"park_slug": area.park.slug, "area_slug": area.slug} class OperatorListView(ListView): """View for displaying a list of park operators""" template_name = "operators/operator_list.html" context_object_name = "operators" paginate_by = 24 def get_queryset(self): """Get companies that are operators""" from .models.companies import Company from django.db.models import Count return ( Company.objects.filter(roles__contains=["OPERATOR"]) .annotate(park_count=Count("operated_parks")) .order_by("name") ) def get_context_data(self, **kwargs): """Add context data""" context = super().get_context_data(**kwargs) context["total_operators"] = self.get_queryset().count() return context