Files
thrillwiki_django_no_react/backend/apps/parks/views.py
pacnpal b9063ff4f8 feat: Add detailed park and ride pages with HTMX integration
- Implemented park detail page with dynamic content loading for rides and weather.
- Created park list page with filters and search functionality.
- Developed ride detail page showcasing ride stats, reviews, and similar rides.
- Added ride list page with filtering options and dynamic loading.
- Introduced search results page with tabs for parks, rides, and users.
- Added HTMX tests for global search functionality.
2025-12-19 19:53:20 -05:00

1128 lines
39 KiB
Python

from .querysets import get_base_park_queryset
from apps.core.mixins import HTMXFilterableMixin
from .models.location import ParkLocation
from .models.media import ParkPhoto
from apps.moderation.services import ModerationService
from apps.moderation.mixins import (
EditSubmissionMixin,
PhotoSubmissionMixin,
HistoryMixin,
)
from apps.core.views.views import SlugRedirectMixin
from .filters import ParkFilter
from .forms import ParkForm
from .models import Park, ParkArea, ParkReview as Review
from .services import ParkFilterService
from django.http import (
HttpResponseRedirect,
HttpResponse,
HttpRequest,
JsonResponse,
)
from django.core.exceptions import ObjectDoesNotExist
from django.contrib import messages
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.mixins import LoginRequiredMixin
from django.db.models import QuerySet
from django.urls import reverse
from django.shortcuts import get_object_or_404, render
from decimal import InvalidOperation
from django.views.generic import DetailView, ListView, CreateView, UpdateView
import requests
from decimal import Decimal, ROUND_DOWN
from typing import Any, Optional, cast, Literal, Dict
from django.views.decorators.http import require_POST
from django.template.loader import render_to_string
import json
# Constants
PARK_DETAIL_URL = "parks:park_detail"
PARK_LIST_ITEM_TEMPLATE = "parks/partials/park_list_item.html"
REQUIRED_FIELDS_ERROR = (
"Please correct the errors below. Required fields are marked with an asterisk (*)."
)
TRIP_PARKS_TEMPLATE = "parks/partials/trip_parks_list.html"
TRIP_SUMMARY_TEMPLATE = "parks/partials/trip_summary.html"
SAVED_TRIPS_TEMPLATE = "parks/partials/saved_trips.html"
ALLOWED_ROLES = ["MODERATOR", "ADMIN", "SUPERUSER"]
ViewMode = Literal["grid", "list"]
def normalize_osm_result(result: dict) -> dict:
"""Normalize OpenStreetMap result to a consistent format with enhanced address details""" # noqa: E501
from .location_utils import get_english_name, normalize_coordinate
# Get address details
address = result.get("address", {})
# Normalize coordinates
lat = normalize_coordinate(float(result.get("lat")), 9, 6)
lon = normalize_coordinate(float(result.get("lon")), 10, 6)
# Get English names where possible
name = ""
if "namedetails" in result:
name = get_english_name(result["namedetails"])
# Build street address from available components
street_parts = []
if address.get("house_number"):
street_parts.append(address["house_number"])
if address.get("road") or address.get("street"):
street_parts.append(address.get("road") or address.get("street"))
elif address.get("pedestrian"):
street_parts.append(address["pedestrian"])
elif address.get("footway"):
street_parts.append(address["footway"])
# Handle additional address components
suburb = address.get("suburb", "")
district = address.get("district", "")
neighborhood = address.get("neighbourhood", "")
# Build city from available components
city = (
address.get("city")
or address.get("town")
or address.get("village")
or address.get("municipality")
or ""
)
# Get detailed state/region information
state = (
address.get("state") or address.get("province") or address.get("region") or ""
)
# Get postal code with fallbacks
postal_code = address.get("postcode") or address.get("postal_code") or ""
return {
"display_name": name or result.get("display_name", ""),
"lat": lat,
"lon": lon,
"street": " ".join(street_parts).strip(),
"suburb": suburb,
"district": district,
"neighborhood": neighborhood,
"city": city,
"state": state,
"country": address.get("country", ""),
"postal_code": postal_code,
}
def get_view_mode(request: HttpRequest) -> ViewMode:
"""Get the current view mode from request, defaulting to grid"""
view_mode = request.GET.get("view_mode", "grid")
return cast(ViewMode, "list" if view_mode == "list" else "grid")
def add_park_button(request: HttpRequest) -> HttpResponse:
"""Return the add park button partial template"""
return render(request, "parks/partials/add_park_button.html")
def park_actions(request: HttpRequest, slug: str) -> HttpResponse:
"""Return the park actions partial template"""
park = get_object_or_404(Park, slug=slug)
return render(request, "parks/partials/park_actions.html", {"park": park})
def get_park_areas(request: HttpRequest) -> HttpResponse:
"""Return park areas as options for a select element"""
park_id = request.GET.get("park")
if not park_id:
return HttpResponse('<option value="">Select a park first</option>')
try:
park = Park.objects.get(id=park_id)
areas = park.areas.all()
options = ['<option value="">No specific area</option>']
options.extend(
[f'<option value="{area.id}">{area.name}</option>' for area in areas]
)
return HttpResponse("\n".join(options))
except Park.DoesNotExist:
return HttpResponse('<option value="">Invalid park selected</option>')
def location_search(request: HttpRequest) -> JsonResponse:
"""Search for locations using OpenStreetMap Nominatim API"""
query = request.GET.get("q", "")
if not query:
return JsonResponse({"results": []})
response = requests.get(
"https://nominatim.openstreetmap.org/search",
params={
"q": query,
"format": "json",
"addressdetails": 1,
"namedetails": 1,
"accept-language": "en",
"limit": 10,
},
headers={"User-Agent": "ThrillWiki/1.0"},
timeout=60,
)
if response.status_code == 200:
results = response.json()
normalized_results = [normalize_osm_result(result) for result in results]
valid_results = [
r
for r in normalized_results
if r["lat"] is not None and r["lon"] is not None
]
return JsonResponse({"results": valid_results})
return JsonResponse({"results": []})
def reverse_geocode(request: HttpRequest) -> JsonResponse:
"""Reverse geocode coordinates using OpenStreetMap Nominatim API"""
try:
lat = Decimal(request.GET.get("lat", ""))
lon = Decimal(request.GET.get("lon", ""))
except (TypeError, ValueError, InvalidOperation):
return JsonResponse({"error": "Invalid coordinates"}, status=400)
if not lat or not lon:
return JsonResponse({"error": "Missing coordinates"}, status=400)
lat = lat.quantize(Decimal("0.000001"), rounding=ROUND_DOWN)
lon = lon.quantize(Decimal("0.000001"), rounding=ROUND_DOWN)
if lat < -90 or lat > 90:
return JsonResponse(
{"error": "Latitude must be between -90 and 90"}, status=400
)
if lon < -180 or lon > 180:
return JsonResponse(
{"error": "Longitude must be between -180 and 180"}, status=400
)
response = requests.get(
"https://nominatim.openstreetmap.org/reverse",
params={
"lat": str(lat),
"lon": str(lon),
"format": "json",
"addressdetails": 1,
"namedetails": 1,
"accept-language": "en",
},
headers={"User-Agent": "ThrillWiki/1.0"},
timeout=60,
)
if response.status_code == 200:
result = response.json()
normalized_result = normalize_osm_result(result)
if normalized_result["lat"] is None or normalized_result["lon"] is None:
return JsonResponse({"error": "Invalid coordinates"}, status=400)
return JsonResponse(normalized_result)
return JsonResponse({"error": "Geocoding failed"}, status=500)
class ParkListView(HTMXFilterableMixin, ListView):
model = Park
template_name = "parks/park_list.html"
context_object_name = "parks"
filter_class = ParkFilter
paginate_by = 20
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.filter_service = ParkFilterService()
def get_template_names(self) -> list[str]:
"""Return park_list.html for HTMX requests"""
if self.request.htmx:
return ["parks/partials/park_list.html"]
return [self.template_name]
def get_view_mode(self) -> ViewMode:
"""Get the current view mode (grid or list)"""
return get_view_mode(self.request)
def get_queryset(self) -> QuerySet[Park]:
"""Get optimized queryset with filter service"""
try:
# Use filter service for optimized filtering
filter_params = dict(self.request.GET.items())
queryset = self.filter_service.get_filtered_queryset(filter_params)
# Also create filterset for form rendering
self.filterset = self.filter_class(self.request.GET, queryset=queryset)
return self.filterset.qs
except Exception as e:
messages.error(self.request, f"Error loading parks: {str(e)}")
queryset = self.model.objects.none()
self.filterset = self.filter_class(self.request.GET, queryset=queryset)
return queryset
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
"""Add enhanced context with filter stats and suggestions"""
try:
# Initialize filterset if not exists
if not hasattr(self, "filterset"):
self.filterset = self.filter_class(
self.request.GET, queryset=self.model.objects.none()
)
context = super().get_context_data(**kwargs)
# Add filter service data
filter_counts = self.filter_service.get_filter_counts()
popular_filters = self.filter_service.get_popular_filters()
context.update(
{
"view_mode": self.get_view_mode(),
"is_search": bool(self.request.GET.get("search")),
"search_query": self.request.GET.get("search", ""),
"filter_counts": filter_counts,
"popular_filters": popular_filters,
"total_results": (
context.get("paginator").count
if context.get("paginator")
else 0
),
}
)
# Add filter suggestions for search queries
search_query = self.request.GET.get("search", "")
if search_query:
context["filter_suggestions"] = (
self.filter_service.get_filter_suggestions(search_query)
)
return context
except Exception as e:
messages.error(self.request, f"Error applying filters: {str(e)}")
# Ensure filterset exists in error case
if not hasattr(self, "filterset"):
self.filterset = self.filter_class(
self.request.GET, queryset=self.model.objects.none()
)
return {
"filter": self.filterset,
"error": "Unable to apply filters. Please try adjusting your criteria.",
"view_mode": self.get_view_mode(),
"is_search": bool(self.request.GET.get("search")),
"search_query": self.request.GET.get("search", ""),
}
def _get_clean_filter_params(self) -> Dict[str, Any]:
"""Extract and clean filter parameters from request."""
filter_params = {}
# Define valid filter fields
valid_filters = {
"status",
"operator",
"park_type",
"has_coasters",
"min_rating",
"big_parks_only",
"ordering",
"search",
}
for param, value in self.request.GET.items():
if param in valid_filters and value:
# Skip pagination parameter
if param == "page":
continue
# Clean and validate the value
filter_params[param] = self._clean_filter_value(param, value)
return {k: v for k, v in filter_params.items() if v is not None}
def _clean_filter_value(self, param: str, value: str) -> Optional[Any]:
"""Clean and validate a single filter value."""
if param in ("has_coasters", "big_parks_only"):
# Boolean filters
return value.lower() in ("true", "1", "yes", "on")
elif param == "min_rating":
# Numeric filter
try:
rating = float(value)
if 0 <= rating <= 5:
return str(rating)
except (ValueError, TypeError):
pass # Skip invalid ratings
return None
elif param == "search":
# Search filter
clean_search = value.strip()
return clean_search if clean_search else None
else:
# String filters
return value.strip()
def _build_filter_query_string(self, filter_params: Dict[str, Any]) -> str:
"""Build query string from filter parameters."""
from urllib.parse import urlencode
# Convert boolean values to strings for URL
url_params = {}
for key, value in filter_params.items():
if isinstance(value, bool):
url_params[key] = "true" if value else "false"
else:
url_params[key] = str(value)
return urlencode(url_params)
def _get_pagination_urls(
self, page_obj, filter_params: Dict[str, Any]
) -> Dict[str, str]:
"""Generate pagination URLs that preserve filter state."""
base_query = self._build_filter_query_string(filter_params)
pagination_urls = {}
if page_obj.has_previous():
prev_params = (
f"{base_query}&page={page_obj.previous_page_number()}"
if base_query
else f"page={page_obj.previous_page_number()}"
)
pagination_urls["previous_url"] = f"?{prev_params}"
if page_obj.has_next():
next_params = (
f"{base_query}&page={page_obj.next_page_number()}"
if base_query
else f"page={page_obj.next_page_number()}"
)
pagination_urls["next_url"] = f"?{next_params}"
# First and last page URLs
if page_obj.number > 1:
first_params = f"{base_query}&page=1" if base_query else "page=1"
pagination_urls["first_url"] = f"?{first_params}"
if page_obj.number < page_obj.paginator.num_pages:
last_params = (
f"{base_query}&page={page_obj.paginator.num_pages}"
if base_query
else f"page={page_obj.paginator.num_pages}"
)
pagination_urls["last_url"] = f"?{last_params}"
return pagination_urls
def search_parks(request: HttpRequest) -> HttpResponse:
"""Search parks and return results using park_list_item.html"""
try:
search_query = request.GET.get("search", "").strip()
if not search_query:
return HttpResponse("")
# Get current view mode from request
current_view_mode = request.GET.get("view_mode", "grid")
park_filter = ParkFilter(
{"search": search_query}, queryset=get_base_park_queryset()
)
parks = park_filter.qs
if request.GET.get("quick_search"):
parks = parks[:8] # Limit quick search results
response = render(
request,
PARK_LIST_ITEM_TEMPLATE,
{
"parks": parks,
"view_mode": current_view_mode,
"search_query": search_query,
"is_search": True,
},
)
response["HX-Trigger"] = "searchComplete"
return response
except Exception as e:
response = render(
request,
PARK_LIST_ITEM_TEMPLATE,
{
"parks": [],
"error": f"Error performing search: {str(e)}",
"is_search": True,
},
)
response["HX-Trigger"] = "searchError"
return response
# --------------------
# HTMX roadtrip helpers
# --------------------
def htmx_saved_trips(request: HttpRequest) -> HttpResponse:
"""Return a partial with the user's saved trips (stubbed)."""
trips = []
if request.user.is_authenticated:
try:
from .models import Trip # type: ignore
qs = Trip.objects.filter(owner=request.user).order_by("-created_at")
trips = list(qs[:10])
except Exception:
trips = []
return render(request, SAVED_TRIPS_TEMPLATE, {"trips": trips})
def _get_session_trip(request: HttpRequest) -> list:
raw = request.session.get("trip_parks", [])
try:
return [int(x) for x in raw]
except Exception:
return []
def _save_session_trip(request: HttpRequest, trip_list: list) -> None:
request.session["trip_parks"] = [int(x) for x in trip_list]
request.session.modified = True
@require_POST
def htmx_add_park_to_trip(request: HttpRequest) -> HttpResponse:
"""Add a park id to `request.session['trip_parks']` and return the full trip list partial."""
park_id = request.POST.get("park_id")
if not park_id:
try:
payload = json.loads(request.body.decode("utf-8"))
park_id = payload.get("park_id")
except Exception:
park_id = None
if not park_id:
return HttpResponse("", status=400)
try:
pid = int(park_id)
except Exception:
return HttpResponse("", status=400)
trip = _get_session_trip(request)
if pid not in trip:
trip.append(pid)
_save_session_trip(request, trip)
# Build ordered Park queryset preserving session order
parks = []
for tid in _get_session_trip(request):
try:
parks.append(Park.objects.get(id=tid))
except Park.DoesNotExist:
continue
html = render_to_string(TRIP_PARKS_TEMPLATE, {"trip_parks": parks}, request=request)
resp = HttpResponse(html)
resp["HX-Trigger"] = json.dumps({"tripUpdated": True})
return resp
@require_POST
def htmx_remove_park_from_trip(request: HttpRequest) -> HttpResponse:
"""Remove a park id from `request.session['trip_parks']` and return the updated trip list partial."""
park_id = request.POST.get("park_id")
if not park_id:
try:
payload = json.loads(request.body.decode("utf-8"))
park_id = payload.get("park_id")
except Exception:
park_id = None
if not park_id:
return HttpResponse("", status=400)
try:
pid = int(park_id)
except Exception:
return HttpResponse("", status=400)
trip = _get_session_trip(request)
if pid in trip:
trip = [t for t in trip if t != pid]
_save_session_trip(request, trip)
parks = []
for tid in _get_session_trip(request):
try:
parks.append(Park.objects.get(id=tid))
except Park.DoesNotExist:
continue
html = render_to_string(TRIP_PARKS_TEMPLATE, {"trip_parks": parks}, request=request)
resp = HttpResponse(html)
resp["HX-Trigger"] = json.dumps({"tripUpdated": True})
return resp
@require_POST
def htmx_reorder_parks(request: HttpRequest) -> HttpResponse:
"""Accept an ordered list of park ids and persist it to the session, returning the updated list partial."""
order = []
try:
payload = json.loads(request.body.decode("utf-8"))
order = payload.get("order", [])
except Exception:
order = request.POST.getlist("order[]")
# Normalize to ints
clean_order = []
for item in order:
try:
clean_order.append(int(item))
except Exception:
continue
_save_session_trip(request, clean_order)
parks = []
for tid in _get_session_trip(request):
try:
parks.append(Park.objects.get(id=tid))
except Park.DoesNotExist:
continue
html = render_to_string(TRIP_PARKS_TEMPLATE, {"trip_parks": parks}, request=request)
resp = HttpResponse(html)
resp["HX-Trigger"] = json.dumps({"tripReordered": True})
return resp
@require_POST
def htmx_optimize_route(request: HttpRequest) -> HttpResponse:
"""Compute a simple trip summary from session parks and return the summary partial."""
parks = []
for tid in _get_session_trip(request):
try:
parks.append(Park.objects.get(id=tid))
except Park.DoesNotExist:
continue
# Helper: haversine distance (miles)
import math
def haversine_miles(lat1, lon1, lat2, lon2):
# convert decimal degrees to radians
rlat1, rlon1, rlat2, rlon2 = map(math.radians, [lat1, lon1, lat2, lon2])
dlat = rlat2 - rlat1
dlon = rlon2 - rlon1
a = math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
c = 2 * math.asin(min(1, math.sqrt(a)))
miles = 3958.8 * c
return miles
total_miles = 0.0
waypoints = []
for p in parks:
loc = getattr(p, "location", None)
lat = getattr(loc, "latitude", None) if loc else None
lon = getattr(loc, "longitude", None) if loc else None
if lat is not None and lon is not None:
waypoints.append({"id": p.id, "name": p.name, "latitude": lat, "longitude": lon})
# sum straight-line distances between consecutive waypoints
for i in range(len(waypoints) - 1):
a = waypoints[i]
b = waypoints[i + 1]
try:
total_miles += haversine_miles(a["latitude"], a["longitude"], b["latitude"], b["longitude"])
except Exception:
continue
# Estimate drive time assuming average speed of 60 mph
total_hours = total_miles / 60.0 if total_miles else 0.0
summary = {
"total_distance": f"{int(round(total_miles))} mi",
"total_time": f"{total_hours:.1f} hrs",
"total_parks": len(parks),
"total_rides": sum(getattr(p, "ride_count", 0) or 0 for p in parks),
}
html = render_to_string(TRIP_SUMMARY_TEMPLATE, {"summary": summary}, request=request)
resp = HttpResponse(html)
# Include waypoints payload in HX-Trigger so client can render route on the map
resp["HX-Trigger"] = json.dumps({"tripOptimized": {"parks": waypoints}})
return resp
@require_POST
def htmx_calculate_route(request: HttpRequest) -> HttpResponse:
"""Alias for optimize route for now — returns trip summary partial."""
return htmx_optimize_route(request)
@require_POST
def htmx_save_trip(request: HttpRequest) -> HttpResponse:
"""Save the current session trip to a Trip model (if present) and return saved trips partial."""
name = request.POST.get("name") or "My Trip"
parks = []
for tid in _get_session_trip(request):
try:
parks.append(Park.objects.get(id=tid))
except Park.DoesNotExist:
continue
trips = []
if request.user.is_authenticated:
try:
from .models import Trip # type: ignore
trip = Trip.objects.create(owner=request.user, name=name)
# attempt to associate parks if the Trip model supports it
try:
trip.parks.set([p.id for p in parks])
except Exception:
pass
trips = list(Trip.objects.filter(owner=request.user).order_by("-created_at")[:10])
except Exception:
trips = []
html = render_to_string(SAVED_TRIPS_TEMPLATE, {"trips": trips}, request=request)
resp = HttpResponse(html)
resp["HX-Trigger"] = json.dumps({"tripSaved": True})
return resp
@require_POST
def htmx_clear_trip(request: HttpRequest) -> HttpResponse:
"""Clear the current session trip and return an empty trip list partial."""
_save_session_trip(request, [])
html = render_to_string(TRIP_PARKS_TEMPLATE, {"trip_parks": []}, request=request)
resp = HttpResponse(html)
resp["HX-Trigger"] = json.dumps({"tripCleared": True})
return resp
class ParkCreateView(LoginRequiredMixin, CreateView):
model = Park
form_class = ParkForm
template_name = "parks/park_form.html"
def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]:
data = cleaned_data.copy()
if data.get("owner"):
data["owner"] = data["owner"].id
if data.get("opening_date"):
data["opening_date"] = data["opening_date"].isoformat()
if data.get("closing_date"):
data["closing_date"] = data["closing_date"].isoformat()
decimal_fields = [
"latitude",
"longitude",
"size_acres",
"average_rating",
]
for field in decimal_fields:
if data.get(field):
data[field] = str(data[field])
return data
def normalize_coordinates(self, form: ParkForm) -> None:
if form.cleaned_data.get("latitude"):
lat = Decimal(str(form.cleaned_data["latitude"]))
form.cleaned_data["latitude"] = lat.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
if form.cleaned_data.get("longitude"):
lon = Decimal(str(form.cleaned_data["longitude"]))
form.cleaned_data["longitude"] = lon.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
def form_valid(self, form: ParkForm) -> HttpResponse:
self.normalize_coordinates(form)
changes = self.prepare_changes_data(form.cleaned_data)
# Use the new queue routing service
result = ModerationService.create_edit_submission_with_queue(
content_object=None, # None for CREATE
changes=changes,
submitter=self.request.user,
submission_type="CREATE",
reason=self.request.POST.get("reason", ""),
source=self.request.POST.get("source", ""),
)
if result['status'] == 'auto_approved':
# Moderator submission was auto-approved
self.object = result['created_object']
if form.cleaned_data.get("latitude") and form.cleaned_data.get("longitude"):
# Create or update ParkLocation
park_location, _ = ParkLocation.objects.get_or_create(
park=self.object,
defaults={
"street_address": form.cleaned_data.get("street_address", ""),
"city": form.cleaned_data.get("city", ""),
"state": form.cleaned_data.get("state", ""),
"country": form.cleaned_data.get("country", "USA"),
"postal_code": form.cleaned_data.get("postal_code", ""),
},
)
park_location.set_coordinates(
form.cleaned_data["latitude"],
form.cleaned_data["longitude"],
)
park_location.save()
photos = self.request.FILES.getlist("photos")
uploaded_count = 0
for photo_file in photos:
try:
ParkPhoto.objects.create(
image=photo_file,
uploaded_by=self.request.user,
park=self.object,
)
uploaded_count += 1
except Exception as e:
messages.error(
self.request,
f"Error uploading photo {photo_file.name}: {str(e)}",
)
messages.success(
self.request,
f"Successfully created {self.object.name}. "
f"Added {uploaded_count} photo(s).",
)
return HttpResponseRedirect(self.get_success_url())
elif result['status'] == 'queued':
# Regular user submission was queued
messages.success(
self.request,
"Your park submission has been sent for review. "
"You will be notified when it is approved.",
)
# Redirect to parks list since we don't have an object yet
return HttpResponseRedirect(reverse("parks:park_list"))
elif result['status'] == 'failed':
# Auto-approval failed
messages.error(
self.request,
f"Error creating park: {result['message']}. Please check your input and try again.",
)
return self.form_invalid(form)
# Fallback error case
messages.error(
self.request,
"An unexpected error occurred. Please try again.",
)
return self.form_invalid(form)
def get_success_url(self) -> str:
return reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug})
class ParkUpdateView(LoginRequiredMixin, UpdateView):
model = Park
form_class = ParkForm
template_name = "parks/park_form.html"
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
context["is_edit"] = True
return context
def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]:
data = cleaned_data.copy()
if data.get("owner"):
data["owner"] = data["owner"].id
if data.get("opening_date"):
data["opening_date"] = data["opening_date"].isoformat()
if data.get("closing_date"):
data["closing_date"] = data["closing_date"].isoformat()
decimal_fields = [
"latitude",
"longitude",
"size_acres",
"average_rating",
]
for field in decimal_fields:
if data.get(field):
data[field] = str(data[field])
return data
def normalize_coordinates(self, form: ParkForm) -> None:
if form.cleaned_data.get("latitude"):
lat = Decimal(str(form.cleaned_data["latitude"]))
form.cleaned_data["latitude"] = lat.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
if form.cleaned_data.get("longitude"):
lon = Decimal(str(form.cleaned_data["longitude"]))
form.cleaned_data["longitude"] = lon.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
def form_valid(self, form: ParkForm) -> HttpResponse: # noqa: C901
self.normalize_coordinates(form)
changes = self.prepare_changes_data(form.cleaned_data)
# Use the new queue routing service
result = ModerationService.create_edit_submission_with_queue(
content_object=self.object,
changes=changes,
submitter=self.request.user,
submission_type="EDIT",
reason=self.request.POST.get("reason", ""),
source=self.request.POST.get("source", ""),
)
if result['status'] == 'auto_approved':
# Moderator submission was auto-approved
# The object was already updated by the service
self.object = result['created_object']
location_data = {
"name": self.object.name,
"location_type": "park",
"latitude": form.cleaned_data.get("latitude"),
"longitude": form.cleaned_data.get("longitude"),
"street_address": form.cleaned_data.get("street_address", ""),
"city": form.cleaned_data.get("city", ""),
"state": form.cleaned_data.get("state", ""),
"country": form.cleaned_data.get("country", ""),
"postal_code": form.cleaned_data.get("postal_code", ""),
}
# Create or update ParkLocation
try:
park_location = self.object.location
# Update existing location
for key, value in location_data.items():
if key in ["latitude", "longitude"] and value:
continue # Handle coordinates separately
if hasattr(park_location, key):
setattr(park_location, key, value)
# Handle coordinates if provided
if "latitude" in location_data and "longitude" in location_data:
if location_data["latitude"] and location_data["longitude"]:
park_location.set_coordinates(
float(location_data["latitude"]),
float(location_data["longitude"]),
)
park_location.save()
except ParkLocation.DoesNotExist:
# Create new ParkLocation
coordinates_data = {}
if "latitude" in location_data and "longitude" in location_data:
if location_data["latitude"] and location_data["longitude"]:
coordinates_data = {
"latitude": float(location_data["latitude"]),
"longitude": float(location_data["longitude"]),
}
# Remove coordinate fields from location_data for creation
creation_data = {
k: v
for k, v in location_data.items()
if k not in ["latitude", "longitude"]
}
creation_data.setdefault("country", "USA")
park_location = ParkLocation.objects.create(
park=self.object, **creation_data
)
if coordinates_data:
park_location.set_coordinates(
coordinates_data["latitude"],
coordinates_data["longitude"],
)
park_location.save()
photos = self.request.FILES.getlist("photos")
uploaded_count = 0
for photo_file in photos:
try:
ParkPhoto.objects.create(
image=photo_file,
uploaded_by=self.request.user,
content_type=ContentType.objects.get_for_model(Park),
object_id=self.object.id,
)
uploaded_count += 1
except Exception as e:
messages.error(
self.request,
f"Error uploading photo {photo_file.name}: {str(e)}",
)
messages.success(
self.request,
f"Successfully updated {self.object.name}. "
f"Added {uploaded_count} new photo(s).",
)
return HttpResponseRedirect(self.get_success_url())
elif result['status'] == 'queued':
# Regular user submission was queued
messages.success(
self.request,
f"Your changes to {self.object.name} have been sent for review. "
"You will be notified when they are approved.",
)
return HttpResponseRedirect(
reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug})
)
elif result['status'] == 'failed':
# Auto-approval failed
messages.error(
self.request,
f"Error updating park: {result['message']}. Please check your input and try again.",
)
return self.form_invalid(form)
# Fallback error case
messages.error(
self.request,
"An unexpected error occurred. Please try again.",
)
return self.form_invalid(form)
def form_invalid(self, form: ParkForm) -> HttpResponse:
messages.error(self.request, REQUIRED_FIELDS_ERROR)
for field, errors in form.errors.items():
for error in errors:
messages.error(self.request, f"{field}: {error}")
return super().form_invalid(form)
def get_success_url(self) -> str:
return reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug})
class ParkDetailView(
SlugRedirectMixin,
EditSubmissionMixin,
PhotoSubmissionMixin,
HistoryMixin,
DetailView,
):
model = Park
template_name = "parks/park_detail.html"
context_object_name = "park"
def get_object(self, queryset: Optional[QuerySet[Park]] = None) -> Park:
if queryset is None:
queryset = self.get_queryset()
slug = self.kwargs.get(self.slug_url_kwarg)
if slug is None:
raise ObjectDoesNotExist("No slug provided")
park, _ = Park.get_by_slug(slug)
return park
def get_queryset(self) -> QuerySet[Park]:
return cast(
QuerySet[Park],
super()
.get_queryset()
.prefetch_related(
"rides", "rides__manufacturer", "photos", "areas", "location"
),
)
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
park = cast(Park, self.object)
context["areas"] = park.areas.all()
context["rides"] = park.rides.all().order_by("-status", "name")
if self.request.user.is_authenticated:
context["has_reviewed"] = Review.objects.filter(
user=self.request.user,
park=park,
).exists()
else:
context["has_reviewed"] = False
return context
def get_redirect_url_pattern(self) -> str:
return PARK_DETAIL_URL
class ParkAreaDetailView(
SlugRedirectMixin,
EditSubmissionMixin,
PhotoSubmissionMixin,
HistoryMixin,
DetailView,
):
model = ParkArea
template_name = "parks/area_detail.html"
context_object_name = "area"
slug_url_kwarg = "area_slug"
def get_object(self, queryset: Optional[QuerySet[ParkArea]] = None) -> ParkArea:
if queryset is None:
queryset = self.get_queryset()
park_slug = self.kwargs.get("park_slug")
area_slug = self.kwargs.get("area_slug")
if park_slug is None or area_slug is None:
raise ObjectDoesNotExist("Missing slug")
area, _ = ParkArea.get_by_slug(area_slug)
if area.park.slug != park_slug:
raise ObjectDoesNotExist("Park slug doesn't match")
return area
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
return context
def get_redirect_url_pattern(self) -> str:
return PARK_DETAIL_URL
def get_redirect_url_kwargs(self) -> dict[str, str]:
area = cast(ParkArea, self.object)
return {"park_slug": area.park.slug, "area_slug": area.slug}
class OperatorListView(ListView):
"""View for displaying a list of park operators"""
template_name = "operators/operator_list.html"
context_object_name = "operators"
paginate_by = 24
def get_queryset(self):
"""Get companies that are operators"""
from .models.companies import Company
from django.db.models import Count
return (
Company.objects.filter(roles__contains=["OPERATOR"])
.annotate(park_count=Count("operated_parks"))
.order_by("name")
)
def get_context_data(self, **kwargs):
"""Add context data"""
context = super().get_context_data(**kwargs)
context["total_operators"] = self.get_queryset().count()
return context