Files
thrillwiki_django_no_react/backend/apps/parks/views.py
pacnpal 2e35f8c5d9 feat: Refactor rides app with unique constraints, mixins, and enhanced documentation
- Added migration to convert unique_together constraints to UniqueConstraint for RideModel.
- Introduced RideFormMixin for handling entity suggestions in ride forms.
- Created comprehensive code standards documentation outlining formatting, docstring requirements, complexity guidelines, and testing requirements.
- Established error handling guidelines with a structured exception hierarchy and best practices for API and view error handling.
- Documented view pattern guidelines, emphasizing the use of CBVs, FBVs, and ViewSets with examples.
- Implemented a benchmarking script for query performance analysis and optimization.
- Developed security documentation detailing measures, configurations, and a security checklist.
- Compiled a database optimization guide covering indexing strategies, query optimization patterns, and computed fields.
2025-12-22 11:17:31 -05:00

1072 lines
36 KiB
Python

from .querysets import get_base_park_queryset
from apps.core.mixins import HTMXFilterableMixin
from .models.location import ParkLocation
from .models.media import ParkPhoto
from apps.moderation.mixins import (
EditSubmissionMixin,
PhotoSubmissionMixin,
HistoryMixin,
)
from apps.core.views.views import SlugRedirectMixin
from .filters import ParkFilter
from .forms import ParkForm
from .models import Park, ParkArea, ParkReview as Review
from .services import ParkFilterService, ParkService
from django.http import (
HttpResponseRedirect,
HttpResponse,
HttpRequest,
JsonResponse,
)
from django.core.exceptions import ObjectDoesNotExist
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin
from django.db.models import QuerySet
from django.urls import reverse
from django.shortcuts import get_object_or_404, render
from decimal import InvalidOperation
from django.views.generic import DetailView, ListView, CreateView, UpdateView
import requests
from decimal import Decimal, ROUND_DOWN
from typing import Any, Optional, cast, Literal, Dict
from django.views.decorators.http import require_POST
from django.template.loader import render_to_string
import json
# Constants
PARK_DETAIL_URL = "parks:park_detail"
PARK_LIST_ITEM_TEMPLATE = "parks/partials/park_list_item.html"
REQUIRED_FIELDS_ERROR = (
"Please correct the errors below. Required fields are marked with an asterisk (*)."
)
TRIP_PARKS_TEMPLATE = "parks/partials/trip_parks_list.html"
TRIP_SUMMARY_TEMPLATE = "parks/partials/trip_summary.html"
SAVED_TRIPS_TEMPLATE = "parks/partials/saved_trips.html"
ALLOWED_ROLES = ["MODERATOR", "ADMIN", "SUPERUSER"]
ViewMode = Literal["grid", "list"]
def normalize_osm_result(result: dict) -> dict:
"""Normalize OpenStreetMap result to a consistent format with enhanced address details""" # noqa: E501
from .location_utils import get_english_name, normalize_coordinate
# Get address details
address = result.get("address", {})
# Normalize coordinates
lat = normalize_coordinate(float(result.get("lat")), 9, 6)
lon = normalize_coordinate(float(result.get("lon")), 10, 6)
# Get English names where possible
name = ""
if "namedetails" in result:
name = get_english_name(result["namedetails"])
# Build street address from available components
street_parts = []
if address.get("house_number"):
street_parts.append(address["house_number"])
if address.get("road") or address.get("street"):
street_parts.append(address.get("road") or address.get("street"))
elif address.get("pedestrian"):
street_parts.append(address["pedestrian"])
elif address.get("footway"):
street_parts.append(address["footway"])
# Handle additional address components
suburb = address.get("suburb", "")
district = address.get("district", "")
neighborhood = address.get("neighbourhood", "")
# Build city from available components
city = (
address.get("city")
or address.get("town")
or address.get("village")
or address.get("municipality")
or ""
)
# Get detailed state/region information
state = (
address.get("state") or address.get("province") or address.get("region") or ""
)
# Get postal code with fallbacks
postal_code = address.get("postcode") or address.get("postal_code") or ""
return {
"display_name": name or result.get("display_name", ""),
"lat": lat,
"lon": lon,
"street": " ".join(street_parts).strip(),
"suburb": suburb,
"district": district,
"neighborhood": neighborhood,
"city": city,
"state": state,
"country": address.get("country", ""),
"postal_code": postal_code,
}
def get_view_mode(request: HttpRequest) -> ViewMode:
"""Get the current view mode from request, defaulting to grid"""
view_mode = request.GET.get("view_mode", "grid")
return cast(ViewMode, "list" if view_mode == "list" else "grid")
def add_park_button(request: HttpRequest) -> HttpResponse:
"""Return the add park button partial template"""
return render(request, "parks/partials/add_park_button.html")
def park_actions(request: HttpRequest, slug: str) -> HttpResponse:
"""Return the park actions partial template"""
park = get_object_or_404(Park, slug=slug)
return render(request, "parks/partials/park_actions.html", {"park": park})
def park_status_actions(request: HttpRequest, slug: str) -> HttpResponse:
"""Return FSM status actions for park moderators"""
park = get_object_or_404(Park, slug=slug)
# Only show to moderators
if not request.user.has_perm("parks.change_park"):
return HttpResponse("")
return render(
request,
"parks/partials/park_status_actions.html",
{"park": park, "user": request.user},
)
def park_header_badge(request: HttpRequest, slug: str) -> HttpResponse:
"""Return the header status badge partial for a park"""
park = get_object_or_404(Park, slug=slug)
return render(
request,
"parks/partials/park_header_badge.html",
{"park": park, "user": request.user},
)
def get_park_areas(request: HttpRequest) -> HttpResponse:
"""Return park areas as options for a select element"""
park_id = request.GET.get("park")
if not park_id:
return HttpResponse('<option value="">Select a park first</option>')
try:
park = Park.objects.get(id=park_id)
areas = park.areas.all()
options = ['<option value="">No specific area</option>']
options.extend(
[f'<option value="{area.id}">{area.name}</option>' for area in areas]
)
return HttpResponse("\n".join(options))
except Park.DoesNotExist:
return HttpResponse('<option value="">Invalid park selected</option>')
def location_search(request: HttpRequest) -> JsonResponse:
"""Search for locations using OpenStreetMap Nominatim API"""
query = request.GET.get("q", "")
if not query:
return JsonResponse({"results": []})
response = requests.get(
"https://nominatim.openstreetmap.org/search",
params={
"q": query,
"format": "json",
"addressdetails": 1,
"namedetails": 1,
"accept-language": "en",
"limit": 10,
},
headers={"User-Agent": "ThrillWiki/1.0"},
timeout=60,
)
if response.status_code == 200:
results = response.json()
normalized_results = [normalize_osm_result(result) for result in results]
valid_results = [
r
for r in normalized_results
if r["lat"] is not None and r["lon"] is not None
]
return JsonResponse({"results": valid_results})
return JsonResponse({"results": []})
def reverse_geocode(request: HttpRequest) -> JsonResponse:
"""Reverse geocode coordinates using OpenStreetMap Nominatim API"""
try:
lat = Decimal(request.GET.get("lat", ""))
lon = Decimal(request.GET.get("lon", ""))
except (TypeError, ValueError, InvalidOperation):
return JsonResponse({"error": "Invalid coordinates"}, status=400)
if not lat or not lon:
return JsonResponse({"error": "Missing coordinates"}, status=400)
lat = lat.quantize(Decimal("0.000001"), rounding=ROUND_DOWN)
lon = lon.quantize(Decimal("0.000001"), rounding=ROUND_DOWN)
if lat < -90 or lat > 90:
return JsonResponse(
{"error": "Latitude must be between -90 and 90"}, status=400
)
if lon < -180 or lon > 180:
return JsonResponse(
{"error": "Longitude must be between -180 and 180"}, status=400
)
response = requests.get(
"https://nominatim.openstreetmap.org/reverse",
params={
"lat": str(lat),
"lon": str(lon),
"format": "json",
"addressdetails": 1,
"namedetails": 1,
"accept-language": "en",
},
headers={"User-Agent": "ThrillWiki/1.0"},
timeout=60,
)
if response.status_code == 200:
result = response.json()
normalized_result = normalize_osm_result(result)
if normalized_result["lat"] is None or normalized_result["lon"] is None:
return JsonResponse({"error": "Invalid coordinates"}, status=400)
return JsonResponse(normalized_result)
return JsonResponse({"error": "Geocoding failed"}, status=500)
class ParkListView(HTMXFilterableMixin, ListView):
model = Park
template_name = "parks/park_list.html"
context_object_name = "parks"
filter_class = ParkFilter
paginate_by = 20
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.filter_service = ParkFilterService()
def get_template_names(self) -> list[str]:
"""Return park_list.html for HTMX requests"""
if self.request.htmx:
return ["parks/partials/park_list.html"]
return [self.template_name]
def get_view_mode(self) -> ViewMode:
"""Get the current view mode (grid or list)"""
return get_view_mode(self.request)
def get_queryset(self) -> QuerySet[Park]:
"""Get optimized queryset with filter service"""
try:
# Use filter service for optimized filtering
filter_params = dict(self.request.GET.items())
queryset = self.filter_service.get_filtered_queryset(filter_params)
# Also create filterset for form rendering
self.filterset = self.filter_class(self.request.GET, queryset=queryset)
return self.filterset.qs
except Exception as e:
messages.error(self.request, f"Error loading parks: {str(e)}")
queryset = self.model.objects.none()
self.filterset = self.filter_class(self.request.GET, queryset=queryset)
return queryset
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
"""Add enhanced context with filter stats and suggestions"""
try:
# Initialize filterset if not exists
if not hasattr(self, "filterset"):
self.filterset = self.filter_class(
self.request.GET, queryset=self.model.objects.none()
)
context = super().get_context_data(**kwargs)
# Add filter service data
filter_counts = self.filter_service.get_filter_counts()
popular_filters = self.filter_service.get_popular_filters()
context.update(
{
"view_mode": self.get_view_mode(),
"is_search": bool(self.request.GET.get("search")),
"search_query": self.request.GET.get("search", ""),
"filter_counts": filter_counts,
"popular_filters": popular_filters,
"total_results": (
context.get("paginator").count
if context.get("paginator")
else 0
),
}
)
# Add filter suggestions for search queries
search_query = self.request.GET.get("search", "")
if search_query:
context["filter_suggestions"] = (
self.filter_service.get_filter_suggestions(search_query)
)
return context
except Exception as e:
messages.error(self.request, f"Error applying filters: {str(e)}")
# Ensure filterset exists in error case
if not hasattr(self, "filterset"):
self.filterset = self.filter_class(
self.request.GET, queryset=self.model.objects.none()
)
return {
"filter": self.filterset,
"error": "Unable to apply filters. Please try adjusting your criteria.",
"view_mode": self.get_view_mode(),
"is_search": bool(self.request.GET.get("search")),
"search_query": self.request.GET.get("search", ""),
}
def _get_clean_filter_params(self) -> Dict[str, Any]:
"""Extract and clean filter parameters from request."""
filter_params = {}
# Define valid filter fields
valid_filters = {
"status",
"operator",
"park_type",
"has_coasters",
"min_rating",
"big_parks_only",
"ordering",
"search",
}
for param, value in self.request.GET.items():
if param in valid_filters and value:
# Skip pagination parameter
if param == "page":
continue
# Clean and validate the value
filter_params[param] = self._clean_filter_value(param, value)
return {k: v for k, v in filter_params.items() if v is not None}
def _clean_filter_value(self, param: str, value: str) -> Optional[Any]:
"""Clean and validate a single filter value."""
if param in ("has_coasters", "big_parks_only"):
# Boolean filters
return value.lower() in ("true", "1", "yes", "on")
elif param == "min_rating":
# Numeric filter
try:
rating = float(value)
if 0 <= rating <= 5:
return str(rating)
except (ValueError, TypeError):
pass # Skip invalid ratings
return None
elif param == "search":
# Search filter
clean_search = value.strip()
return clean_search if clean_search else None
else:
# String filters
return value.strip()
def _build_filter_query_string(self, filter_params: Dict[str, Any]) -> str:
"""Build query string from filter parameters."""
from urllib.parse import urlencode
# Convert boolean values to strings for URL
url_params = {}
for key, value in filter_params.items():
if isinstance(value, bool):
url_params[key] = "true" if value else "false"
else:
url_params[key] = str(value)
return urlencode(url_params)
def _get_pagination_urls(
self, page_obj, filter_params: Dict[str, Any]
) -> Dict[str, str]:
"""Generate pagination URLs that preserve filter state."""
base_query = self._build_filter_query_string(filter_params)
pagination_urls = {}
if page_obj.has_previous():
prev_params = (
f"{base_query}&page={page_obj.previous_page_number()}"
if base_query
else f"page={page_obj.previous_page_number()}"
)
pagination_urls["previous_url"] = f"?{prev_params}"
if page_obj.has_next():
next_params = (
f"{base_query}&page={page_obj.next_page_number()}"
if base_query
else f"page={page_obj.next_page_number()}"
)
pagination_urls["next_url"] = f"?{next_params}"
# First and last page URLs
if page_obj.number > 1:
first_params = f"{base_query}&page=1" if base_query else "page=1"
pagination_urls["first_url"] = f"?{first_params}"
if page_obj.number < page_obj.paginator.num_pages:
last_params = (
f"{base_query}&page={page_obj.paginator.num_pages}"
if base_query
else f"page={page_obj.paginator.num_pages}"
)
pagination_urls["last_url"] = f"?{last_params}"
return pagination_urls
def search_parks(request: HttpRequest) -> HttpResponse:
"""Search parks and return results using park_list_item.html"""
try:
search_query = request.GET.get("search", "").strip()
if not search_query:
return HttpResponse("")
# Get current view mode from request
current_view_mode = request.GET.get("view_mode", "grid")
park_filter = ParkFilter(
{"search": search_query}, queryset=get_base_park_queryset()
)
parks = park_filter.qs
if request.GET.get("quick_search"):
parks = parks[:8] # Limit quick search results
response = render(
request,
PARK_LIST_ITEM_TEMPLATE,
{
"parks": parks,
"view_mode": current_view_mode,
"search_query": search_query,
"is_search": True,
},
)
response["HX-Trigger"] = "searchComplete"
return response
except Exception as e:
response = render(
request,
PARK_LIST_ITEM_TEMPLATE,
{
"parks": [],
"error": f"Error performing search: {str(e)}",
"is_search": True,
},
)
response["HX-Trigger"] = "searchError"
return response
# --------------------
# HTMX roadtrip helpers
# --------------------
def htmx_saved_trips(request: HttpRequest) -> HttpResponse:
"""Return a partial with the user's saved trips (stubbed)."""
trips = []
if request.user.is_authenticated:
try:
from .models import Trip # type: ignore
qs = Trip.objects.filter(owner=request.user).order_by("-created_at")
trips = list(qs[:10])
except Exception:
trips = []
return render(request, SAVED_TRIPS_TEMPLATE, {"trips": trips})
def _get_session_trip(request: HttpRequest) -> list:
raw = request.session.get("trip_parks", [])
try:
return [int(x) for x in raw]
except Exception:
return []
def _save_session_trip(request: HttpRequest, trip_list: list) -> None:
request.session["trip_parks"] = [int(x) for x in trip_list]
request.session.modified = True
@require_POST
def htmx_add_park_to_trip(request: HttpRequest) -> HttpResponse:
"""Add a park id to `request.session['trip_parks']` and return the full trip list partial."""
park_id = request.POST.get("park_id")
if not park_id:
try:
payload = json.loads(request.body.decode("utf-8"))
park_id = payload.get("park_id")
except Exception:
park_id = None
if not park_id:
return HttpResponse("", status=400)
try:
pid = int(park_id)
except Exception:
return HttpResponse("", status=400)
trip = _get_session_trip(request)
if pid not in trip:
trip.append(pid)
_save_session_trip(request, trip)
# Build ordered Park queryset preserving session order
parks = []
for tid in _get_session_trip(request):
try:
parks.append(Park.objects.get(id=tid))
except Park.DoesNotExist:
continue
html = render_to_string(TRIP_PARKS_TEMPLATE, {"trip_parks": parks}, request=request)
resp = HttpResponse(html)
resp["HX-Trigger"] = json.dumps({"tripUpdated": True})
return resp
@require_POST
def htmx_remove_park_from_trip(request: HttpRequest) -> HttpResponse:
"""Remove a park id from `request.session['trip_parks']` and return the updated trip list partial."""
park_id = request.POST.get("park_id")
if not park_id:
try:
payload = json.loads(request.body.decode("utf-8"))
park_id = payload.get("park_id")
except Exception:
park_id = None
if not park_id:
return HttpResponse("", status=400)
try:
pid = int(park_id)
except Exception:
return HttpResponse("", status=400)
trip = _get_session_trip(request)
if pid in trip:
trip = [t for t in trip if t != pid]
_save_session_trip(request, trip)
parks = []
for tid in _get_session_trip(request):
try:
parks.append(Park.objects.get(id=tid))
except Park.DoesNotExist:
continue
html = render_to_string(TRIP_PARKS_TEMPLATE, {"trip_parks": parks}, request=request)
resp = HttpResponse(html)
resp["HX-Trigger"] = json.dumps({"tripUpdated": True})
return resp
@require_POST
def htmx_reorder_parks(request: HttpRequest) -> HttpResponse:
"""Accept an ordered list of park ids and persist it to the session, returning the updated list partial."""
order = []
try:
payload = json.loads(request.body.decode("utf-8"))
order = payload.get("order", [])
except Exception:
order = request.POST.getlist("order[]")
# Normalize to ints
clean_order = []
for item in order:
try:
clean_order.append(int(item))
except Exception:
continue
_save_session_trip(request, clean_order)
parks = []
for tid in _get_session_trip(request):
try:
parks.append(Park.objects.get(id=tid))
except Park.DoesNotExist:
continue
html = render_to_string(TRIP_PARKS_TEMPLATE, {"trip_parks": parks}, request=request)
resp = HttpResponse(html)
resp["HX-Trigger"] = json.dumps({"tripReordered": True})
return resp
@require_POST
def htmx_optimize_route(request: HttpRequest) -> HttpResponse:
"""Compute a simple trip summary from session parks and return the summary partial."""
parks = []
for tid in _get_session_trip(request):
try:
parks.append(Park.objects.get(id=tid))
except Park.DoesNotExist:
continue
# Helper: haversine distance (miles)
import math
def haversine_miles(lat1, lon1, lat2, lon2):
# convert decimal degrees to radians
rlat1, rlon1, rlat2, rlon2 = map(math.radians, [lat1, lon1, lat2, lon2])
dlat = rlat2 - rlat1
dlon = rlon2 - rlon1
a = (
math.sin(dlat / 2) ** 2
+ math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
)
c = 2 * math.asin(min(1, math.sqrt(a)))
miles = 3958.8 * c
return miles
total_miles = 0.0
waypoints = []
for p in parks:
loc = getattr(p, "location", None)
lat = getattr(loc, "latitude", None) if loc else None
lon = getattr(loc, "longitude", None) if loc else None
if lat is not None and lon is not None:
waypoints.append(
{"id": p.id, "name": p.name, "latitude": lat, "longitude": lon}
)
# sum straight-line distances between consecutive waypoints
for i in range(len(waypoints) - 1):
a = waypoints[i]
b = waypoints[i + 1]
try:
total_miles += haversine_miles(
a["latitude"], a["longitude"], b["latitude"], b["longitude"]
)
except Exception:
continue
# Estimate drive time assuming average speed of 60 mph
total_hours = total_miles / 60.0 if total_miles else 0.0
summary = {
"total_distance": f"{int(round(total_miles))} mi",
"total_time": f"{total_hours:.1f} hrs",
"total_parks": len(parks),
"total_rides": sum(getattr(p, "ride_count", 0) or 0 for p in parks),
}
html = render_to_string(
TRIP_SUMMARY_TEMPLATE, {"summary": summary}, request=request
)
resp = HttpResponse(html)
# Include waypoints payload in HX-Trigger so client can render route on the map
resp["HX-Trigger"] = json.dumps({"tripOptimized": {"parks": waypoints}})
return resp
@require_POST
def htmx_calculate_route(request: HttpRequest) -> HttpResponse:
"""Alias for optimize route for now — returns trip summary partial."""
return htmx_optimize_route(request)
@require_POST
def htmx_save_trip(request: HttpRequest) -> HttpResponse:
"""Save the current session trip to a Trip model (if present) and return saved trips partial."""
name = request.POST.get("name") or "My Trip"
parks = []
for tid in _get_session_trip(request):
try:
parks.append(Park.objects.get(id=tid))
except Park.DoesNotExist:
continue
trips = []
if request.user.is_authenticated:
try:
from .models import Trip # type: ignore
trip = Trip.objects.create(owner=request.user, name=name)
# attempt to associate parks if the Trip model supports it
try:
trip.parks.set([p.id for p in parks])
except Exception:
pass
trips = list(
Trip.objects.filter(owner=request.user).order_by("-created_at")[:10]
)
except Exception:
trips = []
html = render_to_string(SAVED_TRIPS_TEMPLATE, {"trips": trips}, request=request)
resp = HttpResponse(html)
resp["HX-Trigger"] = json.dumps({"tripSaved": True})
return resp
@require_POST
def htmx_clear_trip(request: HttpRequest) -> HttpResponse:
"""Clear the current session trip and return an empty trip list partial."""
_save_session_trip(request, [])
html = render_to_string(TRIP_PARKS_TEMPLATE, {"trip_parks": []}, request=request)
resp = HttpResponse(html)
resp["HX-Trigger"] = json.dumps({"tripCleared": True})
return resp
class ParkCreateView(LoginRequiredMixin, CreateView):
model = Park
form_class = ParkForm
template_name = "parks/park_form.html"
def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]:
data = cleaned_data.copy()
if data.get("owner"):
data["owner"] = data["owner"].id
if data.get("opening_date"):
data["opening_date"] = data["opening_date"].isoformat()
if data.get("closing_date"):
data["closing_date"] = data["closing_date"].isoformat()
decimal_fields = [
"latitude",
"longitude",
"size_acres",
"average_rating",
]
for field in decimal_fields:
if data.get(field):
data[field] = str(data[field])
return data
def normalize_coordinates(self, form: ParkForm) -> None:
if form.cleaned_data.get("latitude"):
lat = Decimal(str(form.cleaned_data["latitude"]))
form.cleaned_data["latitude"] = lat.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
if form.cleaned_data.get("longitude"):
lon = Decimal(str(form.cleaned_data["longitude"]))
form.cleaned_data["longitude"] = lon.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
def form_valid(self, form: ParkForm) -> HttpResponse:
self.normalize_coordinates(form)
changes = self.prepare_changes_data(form.cleaned_data)
# Submit through moderation service
result = ParkService.create_park_with_moderation(
changes=changes,
submitter=self.request.user,
reason=self.request.POST.get("reason", ""),
source=self.request.POST.get("source", ""),
)
# Handle the result using the service
photos = self.request.FILES.getlist("photos")
service_result = ParkService.handle_park_creation_result(
result=result,
form_data=form.cleaned_data,
photos=photos,
user=self.request.user,
)
# Report any photo upload errors
for error in service_result.get("errors", []):
messages.error(self.request, error)
if service_result["status"] == "auto_approved":
self.object = service_result["park"]
messages.success(
self.request,
f"Successfully created {self.object.name}. "
f"Added {service_result['uploaded_count']} photo(s).",
)
return HttpResponseRedirect(self.get_success_url())
elif service_result["status"] == "queued":
messages.success(
self.request,
"Your park submission has been sent for review. "
"You will be notified when it is approved.",
)
return HttpResponseRedirect(reverse("parks:park_list"))
elif service_result["status"] == "failed":
messages.error(
self.request,
f"Error creating park: {service_result.get('message', 'Unknown error')}. "
"Please check your input and try again.",
)
return self.form_invalid(form)
# Fallback error case
messages.error(
self.request,
"An unexpected error occurred. Please try again.",
)
return self.form_invalid(form)
def get_success_url(self) -> str:
return reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug})
class ParkUpdateView(LoginRequiredMixin, UpdateView):
model = Park
form_class = ParkForm
template_name = "parks/park_form.html"
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
context["is_edit"] = True
return context
def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]:
data = cleaned_data.copy()
if data.get("owner"):
data["owner"] = data["owner"].id
if data.get("opening_date"):
data["opening_date"] = data["opening_date"].isoformat()
if data.get("closing_date"):
data["closing_date"] = data["closing_date"].isoformat()
decimal_fields = [
"latitude",
"longitude",
"size_acres",
"average_rating",
]
for field in decimal_fields:
if data.get(field):
data[field] = str(data[field])
return data
def normalize_coordinates(self, form: ParkForm) -> None:
if form.cleaned_data.get("latitude"):
lat = Decimal(str(form.cleaned_data["latitude"]))
form.cleaned_data["latitude"] = lat.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
if form.cleaned_data.get("longitude"):
lon = Decimal(str(form.cleaned_data["longitude"]))
form.cleaned_data["longitude"] = lon.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
def form_valid(self, form: ParkForm) -> HttpResponse:
self.normalize_coordinates(form)
changes = self.prepare_changes_data(form.cleaned_data)
# Submit through moderation service
result = ParkService.update_park_with_moderation(
park=self.object,
changes=changes,
submitter=self.request.user,
reason=self.request.POST.get("reason", ""),
source=self.request.POST.get("source", ""),
)
# Handle the result using the service
photos = self.request.FILES.getlist("photos")
service_result = ParkService.handle_park_update_result(
result=result,
park=self.object,
form_data=form.cleaned_data,
photos=photos,
user=self.request.user,
)
# Report any photo upload errors
for error in service_result.get("errors", []):
messages.error(self.request, error)
if service_result["status"] == "auto_approved":
self.object = service_result["park"]
messages.success(
self.request,
f"Successfully updated {self.object.name}. "
f"Added {service_result['uploaded_count']} new photo(s).",
)
return HttpResponseRedirect(self.get_success_url())
elif service_result["status"] == "queued":
messages.success(
self.request,
f"Your changes to {self.object.name} have been sent for review. "
"You will be notified when they are approved.",
)
return HttpResponseRedirect(
reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug})
)
elif service_result["status"] == "failed":
messages.error(
self.request,
f"Error updating park: {service_result.get('message', 'Unknown error')}. "
"Please check your input and try again.",
)
return self.form_invalid(form)
# Fallback error case
messages.error(
self.request,
"An unexpected error occurred. Please try again.",
)
return self.form_invalid(form)
def form_invalid(self, form: ParkForm) -> HttpResponse:
messages.error(self.request, REQUIRED_FIELDS_ERROR)
for field, errors in form.errors.items():
for error in errors:
messages.error(self.request, f"{field}: {error}")
return super().form_invalid(form)
def get_success_url(self) -> str:
return reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug})
class ParkDetailView(
SlugRedirectMixin,
EditSubmissionMixin,
PhotoSubmissionMixin,
HistoryMixin,
DetailView,
):
model = Park
template_name = "parks/park_detail.html"
context_object_name = "park"
def get_object(self, queryset: Optional[QuerySet[Park]] = None) -> Park:
if queryset is None:
queryset = self.get_queryset()
slug = self.kwargs.get(self.slug_url_kwarg)
if slug is None:
raise ObjectDoesNotExist("No slug provided")
park, _ = Park.get_by_slug(slug)
return park
def get_queryset(self) -> QuerySet[Park]:
return cast(
QuerySet[Park],
super()
.get_queryset()
.prefetch_related(
"rides", "rides__manufacturer", "photos", "areas", "location"
),
)
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
park = cast(Park, self.object)
context["areas"] = park.areas.all()
context["rides"] = park.rides.all().order_by("-status", "name")
if self.request.user.is_authenticated:
context["has_reviewed"] = Review.objects.filter(
user=self.request.user,
park=park,
).exists()
else:
context["has_reviewed"] = False
return context
def get_redirect_url_pattern(self) -> str:
return PARK_DETAIL_URL
class ParkAreaDetailView(
SlugRedirectMixin,
EditSubmissionMixin,
PhotoSubmissionMixin,
HistoryMixin,
DetailView,
):
model = ParkArea
template_name = "parks/area_detail.html"
context_object_name = "area"
slug_url_kwarg = "area_slug"
def get_object(self, queryset: Optional[QuerySet[ParkArea]] = None) -> ParkArea:
if queryset is None:
queryset = self.get_queryset()
park_slug = self.kwargs.get("park_slug")
area_slug = self.kwargs.get("area_slug")
if park_slug is None or area_slug is None:
raise ObjectDoesNotExist("Missing slug")
area, _ = ParkArea.get_by_slug(area_slug)
if area.park.slug != park_slug:
raise ObjectDoesNotExist("Park slug doesn't match")
return area
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
return context
def get_redirect_url_pattern(self) -> str:
return PARK_DETAIL_URL
def get_redirect_url_kwargs(self) -> dict[str, str]:
area = cast(ParkArea, self.object)
return {"park_slug": area.park.slug, "area_slug": area.slug}
class OperatorListView(ListView):
"""View for displaying a list of park operators"""
template_name = "operators/operator_list.html"
context_object_name = "operators"
paginate_by = 24
def get_queryset(self):
"""Get companies that are operators with optimized query"""
from .models.companies import Company
from django.db.models import Count
return (
Company.objects.filter(roles__contains=["OPERATOR"])
.annotate(park_count=Count("operated_parks"))
.only("id", "name", "slug", "roles", "description")
.order_by("name")
)
def get_context_data(self, **kwargs):
"""Add context data"""
context = super().get_context_data(**kwargs)
context["total_operators"] = self.get_queryset().count()
return context