mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-20 12:11:13 -05:00
- Added Button component with various styles and sizes. - Introduced Card component for displaying content with titles and descriptions. - Created Input component for form fields with support for various attributes. - Developed Toast Notification Container for displaying alerts and messages. - Designed pages for listing designers and operators with pagination and responsive layout. - Documented frontend migration from React to HTMX + Alpine.js, detailing component usage and integration.
877 lines
31 KiB
Python
877 lines
31 KiB
Python
from .querysets import get_base_park_queryset
|
|
from apps.core.mixins import HTMXFilterableMixin
|
|
from .models.location import ParkLocation
|
|
from .models.media import ParkPhoto
|
|
from apps.moderation.services import ModerationService
|
|
from apps.moderation.mixins import (
|
|
EditSubmissionMixin,
|
|
PhotoSubmissionMixin,
|
|
HistoryMixin,
|
|
)
|
|
from apps.core.views.views import SlugRedirectMixin
|
|
from .filters import ParkFilter
|
|
from .forms import ParkForm
|
|
from .models import Park, ParkArea, ParkReview as Review
|
|
from .services import ParkFilterService
|
|
from django.http import (
|
|
HttpResponseRedirect,
|
|
HttpResponse,
|
|
HttpRequest,
|
|
JsonResponse,
|
|
)
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django.contrib import messages
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib.auth.mixins import LoginRequiredMixin
|
|
from django.db.models import QuerySet
|
|
from django.urls import reverse
|
|
from django.shortcuts import get_object_or_404, render
|
|
from decimal import InvalidOperation
|
|
from django.views.generic import DetailView, ListView, CreateView, UpdateView
|
|
import requests
|
|
from decimal import Decimal, ROUND_DOWN
|
|
from typing import Any, Optional, cast, Literal, Dict
|
|
|
|
# Constants
|
|
PARK_DETAIL_URL = "parks:park_detail"
|
|
PARK_LIST_ITEM_TEMPLATE = "parks/partials/park_list_item.html"
|
|
REQUIRED_FIELDS_ERROR = (
|
|
"Please correct the errors below. Required fields are marked with an asterisk (*)."
|
|
)
|
|
ALLOWED_ROLES = ["MODERATOR", "ADMIN", "SUPERUSER"]
|
|
|
|
|
|
ViewMode = Literal["grid", "list"]
|
|
|
|
|
|
def normalize_osm_result(result: dict) -> dict:
|
|
"""Normalize OpenStreetMap result to a consistent format with enhanced address details""" # noqa: E501
|
|
from .location_utils import get_english_name, normalize_coordinate
|
|
|
|
# Get address details
|
|
address = result.get("address", {})
|
|
|
|
# Normalize coordinates
|
|
lat = normalize_coordinate(float(result.get("lat")), 9, 6)
|
|
lon = normalize_coordinate(float(result.get("lon")), 10, 6)
|
|
|
|
# Get English names where possible
|
|
name = ""
|
|
if "namedetails" in result:
|
|
name = get_english_name(result["namedetails"])
|
|
|
|
# Build street address from available components
|
|
street_parts = []
|
|
if address.get("house_number"):
|
|
street_parts.append(address["house_number"])
|
|
if address.get("road") or address.get("street"):
|
|
street_parts.append(address.get("road") or address.get("street"))
|
|
elif address.get("pedestrian"):
|
|
street_parts.append(address["pedestrian"])
|
|
elif address.get("footway"):
|
|
street_parts.append(address["footway"])
|
|
|
|
# Handle additional address components
|
|
suburb = address.get("suburb", "")
|
|
district = address.get("district", "")
|
|
neighborhood = address.get("neighbourhood", "")
|
|
|
|
# Build city from available components
|
|
city = (
|
|
address.get("city")
|
|
or address.get("town")
|
|
or address.get("village")
|
|
or address.get("municipality")
|
|
or ""
|
|
)
|
|
|
|
# Get detailed state/region information
|
|
state = (
|
|
address.get("state") or address.get("province") or address.get("region") or ""
|
|
)
|
|
|
|
# Get postal code with fallbacks
|
|
postal_code = address.get("postcode") or address.get("postal_code") or ""
|
|
|
|
return {
|
|
"display_name": name or result.get("display_name", ""),
|
|
"lat": lat,
|
|
"lon": lon,
|
|
"street": " ".join(street_parts).strip(),
|
|
"suburb": suburb,
|
|
"district": district,
|
|
"neighborhood": neighborhood,
|
|
"city": city,
|
|
"state": state,
|
|
"country": address.get("country", ""),
|
|
"postal_code": postal_code,
|
|
}
|
|
|
|
|
|
def get_view_mode(request: HttpRequest) -> ViewMode:
|
|
"""Get the current view mode from request, defaulting to grid"""
|
|
view_mode = request.GET.get("view_mode", "grid")
|
|
return cast(ViewMode, "list" if view_mode == "list" else "grid")
|
|
|
|
|
|
def add_park_button(request: HttpRequest) -> HttpResponse:
|
|
"""Return the add park button partial template"""
|
|
return render(request, "parks/partials/add_park_button.html")
|
|
|
|
|
|
def park_actions(request: HttpRequest, slug: str) -> HttpResponse:
|
|
"""Return the park actions partial template"""
|
|
park = get_object_or_404(Park, slug=slug)
|
|
return render(request, "parks/partials/park_actions.html", {"park": park})
|
|
|
|
|
|
def get_park_areas(request: HttpRequest) -> HttpResponse:
|
|
"""Return park areas as options for a select element"""
|
|
park_id = request.GET.get("park")
|
|
if not park_id:
|
|
return HttpResponse('<option value="">Select a park first</option>')
|
|
|
|
try:
|
|
park = Park.objects.get(id=park_id)
|
|
areas = park.areas.all()
|
|
options = ['<option value="">No specific area</option>']
|
|
options.extend(
|
|
[f'<option value="{area.id}">{area.name}</option>' for area in areas]
|
|
)
|
|
return HttpResponse("\n".join(options))
|
|
except Park.DoesNotExist:
|
|
return HttpResponse('<option value="">Invalid park selected</option>')
|
|
|
|
|
|
def location_search(request: HttpRequest) -> JsonResponse:
|
|
"""Search for locations using OpenStreetMap Nominatim API"""
|
|
query = request.GET.get("q", "")
|
|
if not query:
|
|
return JsonResponse({"results": []})
|
|
|
|
response = requests.get(
|
|
"https://nominatim.openstreetmap.org/search",
|
|
params={
|
|
"q": query,
|
|
"format": "json",
|
|
"addressdetails": 1,
|
|
"namedetails": 1,
|
|
"accept-language": "en",
|
|
"limit": 10,
|
|
},
|
|
headers={"User-Agent": "ThrillWiki/1.0"},
|
|
timeout=60,
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
results = response.json()
|
|
normalized_results = [normalize_osm_result(result) for result in results]
|
|
valid_results = [
|
|
r
|
|
for r in normalized_results
|
|
if r["lat"] is not None and r["lon"] is not None
|
|
]
|
|
return JsonResponse({"results": valid_results})
|
|
|
|
return JsonResponse({"results": []})
|
|
|
|
|
|
def reverse_geocode(request: HttpRequest) -> JsonResponse:
|
|
"""Reverse geocode coordinates using OpenStreetMap Nominatim API"""
|
|
try:
|
|
lat = Decimal(request.GET.get("lat", ""))
|
|
lon = Decimal(request.GET.get("lon", ""))
|
|
except (TypeError, ValueError, InvalidOperation):
|
|
return JsonResponse({"error": "Invalid coordinates"}, status=400)
|
|
|
|
if not lat or not lon:
|
|
return JsonResponse({"error": "Missing coordinates"}, status=400)
|
|
|
|
lat = lat.quantize(Decimal("0.000001"), rounding=ROUND_DOWN)
|
|
lon = lon.quantize(Decimal("0.000001"), rounding=ROUND_DOWN)
|
|
|
|
if lat < -90 or lat > 90:
|
|
return JsonResponse(
|
|
{"error": "Latitude must be between -90 and 90"}, status=400
|
|
)
|
|
if lon < -180 or lon > 180:
|
|
return JsonResponse(
|
|
{"error": "Longitude must be between -180 and 180"}, status=400
|
|
)
|
|
|
|
response = requests.get(
|
|
"https://nominatim.openstreetmap.org/reverse",
|
|
params={
|
|
"lat": str(lat),
|
|
"lon": str(lon),
|
|
"format": "json",
|
|
"addressdetails": 1,
|
|
"namedetails": 1,
|
|
"accept-language": "en",
|
|
},
|
|
headers={"User-Agent": "ThrillWiki/1.0"},
|
|
timeout=60,
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
normalized_result = normalize_osm_result(result)
|
|
if normalized_result["lat"] is None or normalized_result["lon"] is None:
|
|
return JsonResponse({"error": "Invalid coordinates"}, status=400)
|
|
return JsonResponse(normalized_result)
|
|
|
|
return JsonResponse({"error": "Geocoding failed"}, status=500)
|
|
|
|
|
|
class ParkListView(HTMXFilterableMixin, ListView):
|
|
model = Park
|
|
template_name = "parks/park_list.html"
|
|
context_object_name = "parks"
|
|
filter_class = ParkFilter
|
|
paginate_by = 20
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.filter_service = ParkFilterService()
|
|
|
|
def get_template_names(self) -> list[str]:
|
|
"""Return park_list.html for HTMX requests"""
|
|
if self.request.htmx:
|
|
return ["parks/partials/park_list.html"]
|
|
return [self.template_name]
|
|
|
|
def get_view_mode(self) -> ViewMode:
|
|
"""Get the current view mode (grid or list)"""
|
|
return get_view_mode(self.request)
|
|
|
|
def get_queryset(self) -> QuerySet[Park]:
|
|
"""Get optimized queryset with filter service"""
|
|
try:
|
|
# Use filter service for optimized filtering
|
|
filter_params = dict(self.request.GET.items())
|
|
queryset = self.filter_service.get_filtered_queryset(filter_params)
|
|
|
|
# Also create filterset for form rendering
|
|
self.filterset = self.filter_class(self.request.GET, queryset=queryset)
|
|
return self.filterset.qs
|
|
except Exception as e:
|
|
messages.error(self.request, f"Error loading parks: {str(e)}")
|
|
queryset = self.model.objects.none()
|
|
self.filterset = self.filter_class(self.request.GET, queryset=queryset)
|
|
return queryset
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
"""Add enhanced context with filter stats and suggestions"""
|
|
try:
|
|
# Initialize filterset if not exists
|
|
if not hasattr(self, "filterset"):
|
|
self.filterset = self.filter_class(
|
|
self.request.GET, queryset=self.model.objects.none()
|
|
)
|
|
|
|
context = super().get_context_data(**kwargs)
|
|
|
|
# Add filter service data
|
|
filter_counts = self.filter_service.get_filter_counts()
|
|
popular_filters = self.filter_service.get_popular_filters()
|
|
|
|
context.update(
|
|
{
|
|
"view_mode": self.get_view_mode(),
|
|
"is_search": bool(self.request.GET.get("search")),
|
|
"search_query": self.request.GET.get("search", ""),
|
|
"filter_counts": filter_counts,
|
|
"popular_filters": popular_filters,
|
|
"total_results": (
|
|
context.get("paginator").count
|
|
if context.get("paginator")
|
|
else 0
|
|
),
|
|
}
|
|
)
|
|
|
|
# Add filter suggestions for search queries
|
|
search_query = self.request.GET.get("search", "")
|
|
if search_query:
|
|
context["filter_suggestions"] = (
|
|
self.filter_service.get_filter_suggestions(search_query)
|
|
)
|
|
|
|
return context
|
|
|
|
except Exception as e:
|
|
messages.error(self.request, f"Error applying filters: {str(e)}")
|
|
# Ensure filterset exists in error case
|
|
if not hasattr(self, "filterset"):
|
|
self.filterset = self.filter_class(
|
|
self.request.GET, queryset=self.model.objects.none()
|
|
)
|
|
return {
|
|
"filter": self.filterset,
|
|
"error": "Unable to apply filters. Please try adjusting your criteria.",
|
|
"view_mode": self.get_view_mode(),
|
|
"is_search": bool(self.request.GET.get("search")),
|
|
"search_query": self.request.GET.get("search", ""),
|
|
}
|
|
|
|
def _get_clean_filter_params(self) -> Dict[str, Any]:
|
|
"""Extract and clean filter parameters from request."""
|
|
filter_params = {}
|
|
|
|
# Define valid filter fields
|
|
valid_filters = {
|
|
"status",
|
|
"operator",
|
|
"park_type",
|
|
"has_coasters",
|
|
"min_rating",
|
|
"big_parks_only",
|
|
"ordering",
|
|
"search",
|
|
}
|
|
|
|
for param, value in self.request.GET.items():
|
|
if param in valid_filters and value:
|
|
# Skip pagination parameter
|
|
if param == "page":
|
|
continue
|
|
|
|
# Clean and validate the value
|
|
filter_params[param] = self._clean_filter_value(param, value)
|
|
|
|
return {k: v for k, v in filter_params.items() if v is not None}
|
|
|
|
def _clean_filter_value(self, param: str, value: str) -> Optional[Any]:
|
|
"""Clean and validate a single filter value."""
|
|
if param in ("has_coasters", "big_parks_only"):
|
|
# Boolean filters
|
|
return value.lower() in ("true", "1", "yes", "on")
|
|
elif param == "min_rating":
|
|
# Numeric filter
|
|
try:
|
|
rating = float(value)
|
|
if 0 <= rating <= 5:
|
|
return str(rating)
|
|
except (ValueError, TypeError):
|
|
pass # Skip invalid ratings
|
|
return None
|
|
elif param == "search":
|
|
# Search filter
|
|
clean_search = value.strip()
|
|
return clean_search if clean_search else None
|
|
else:
|
|
# String filters
|
|
return value.strip()
|
|
|
|
def _build_filter_query_string(self, filter_params: Dict[str, Any]) -> str:
|
|
"""Build query string from filter parameters."""
|
|
from urllib.parse import urlencode
|
|
|
|
# Convert boolean values to strings for URL
|
|
url_params = {}
|
|
for key, value in filter_params.items():
|
|
if isinstance(value, bool):
|
|
url_params[key] = "true" if value else "false"
|
|
else:
|
|
url_params[key] = str(value)
|
|
|
|
return urlencode(url_params)
|
|
|
|
def _get_pagination_urls(
|
|
self, page_obj, filter_params: Dict[str, Any]
|
|
) -> Dict[str, str]:
|
|
"""Generate pagination URLs that preserve filter state."""
|
|
|
|
base_query = self._build_filter_query_string(filter_params)
|
|
pagination_urls = {}
|
|
|
|
if page_obj.has_previous():
|
|
prev_params = (
|
|
f"{base_query}&page={page_obj.previous_page_number()}"
|
|
if base_query
|
|
else f"page={page_obj.previous_page_number()}"
|
|
)
|
|
pagination_urls["previous_url"] = f"?{prev_params}"
|
|
|
|
if page_obj.has_next():
|
|
next_params = (
|
|
f"{base_query}&page={page_obj.next_page_number()}"
|
|
if base_query
|
|
else f"page={page_obj.next_page_number()}"
|
|
)
|
|
pagination_urls["next_url"] = f"?{next_params}"
|
|
|
|
# First and last page URLs
|
|
if page_obj.number > 1:
|
|
first_params = f"{base_query}&page=1" if base_query else "page=1"
|
|
pagination_urls["first_url"] = f"?{first_params}"
|
|
|
|
if page_obj.number < page_obj.paginator.num_pages:
|
|
last_params = (
|
|
f"{base_query}&page={page_obj.paginator.num_pages}"
|
|
if base_query
|
|
else f"page={page_obj.paginator.num_pages}"
|
|
)
|
|
pagination_urls["last_url"] = f"?{last_params}"
|
|
|
|
return pagination_urls
|
|
|
|
|
|
def search_parks(request: HttpRequest) -> HttpResponse:
|
|
"""Search parks and return results using park_list_item.html"""
|
|
try:
|
|
search_query = request.GET.get("search", "").strip()
|
|
if not search_query:
|
|
return HttpResponse("")
|
|
|
|
# Get current view mode from request
|
|
current_view_mode = request.GET.get("view_mode", "grid")
|
|
park_filter = ParkFilter(
|
|
{"search": search_query}, queryset=get_base_park_queryset()
|
|
)
|
|
|
|
parks = park_filter.qs
|
|
if request.GET.get("quick_search"):
|
|
parks = parks[:8] # Limit quick search results
|
|
|
|
response = render(
|
|
request,
|
|
PARK_LIST_ITEM_TEMPLATE,
|
|
{
|
|
"parks": parks,
|
|
"view_mode": current_view_mode,
|
|
"search_query": search_query,
|
|
"is_search": True,
|
|
},
|
|
)
|
|
response["HX-Trigger"] = "searchComplete"
|
|
return response
|
|
|
|
except Exception as e:
|
|
response = render(
|
|
request,
|
|
PARK_LIST_ITEM_TEMPLATE,
|
|
{
|
|
"parks": [],
|
|
"error": f"Error performing search: {str(e)}",
|
|
"is_search": True,
|
|
},
|
|
)
|
|
response["HX-Trigger"] = "searchError"
|
|
return response
|
|
|
|
|
|
class ParkCreateView(LoginRequiredMixin, CreateView):
|
|
model = Park
|
|
form_class = ParkForm
|
|
template_name = "parks/park_form.html"
|
|
|
|
def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]:
|
|
data = cleaned_data.copy()
|
|
if data.get("owner"):
|
|
data["owner"] = data["owner"].id
|
|
if data.get("opening_date"):
|
|
data["opening_date"] = data["opening_date"].isoformat()
|
|
if data.get("closing_date"):
|
|
data["closing_date"] = data["closing_date"].isoformat()
|
|
decimal_fields = [
|
|
"latitude",
|
|
"longitude",
|
|
"size_acres",
|
|
"average_rating",
|
|
]
|
|
for field in decimal_fields:
|
|
if data.get(field):
|
|
data[field] = str(data[field])
|
|
return data
|
|
|
|
def normalize_coordinates(self, form: ParkForm) -> None:
|
|
if form.cleaned_data.get("latitude"):
|
|
lat = Decimal(str(form.cleaned_data["latitude"]))
|
|
form.cleaned_data["latitude"] = lat.quantize(
|
|
Decimal("0.000001"), rounding=ROUND_DOWN
|
|
)
|
|
if form.cleaned_data.get("longitude"):
|
|
lon = Decimal(str(form.cleaned_data["longitude"]))
|
|
form.cleaned_data["longitude"] = lon.quantize(
|
|
Decimal("0.000001"), rounding=ROUND_DOWN
|
|
)
|
|
|
|
def form_valid(self, form: ParkForm) -> HttpResponse:
|
|
self.normalize_coordinates(form)
|
|
changes = self.prepare_changes_data(form.cleaned_data)
|
|
|
|
# Use the new queue routing service
|
|
result = ModerationService.create_edit_submission_with_queue(
|
|
content_object=None, # None for CREATE
|
|
changes=changes,
|
|
submitter=self.request.user,
|
|
submission_type="CREATE",
|
|
reason=self.request.POST.get("reason", ""),
|
|
source=self.request.POST.get("source", ""),
|
|
)
|
|
|
|
if result['status'] == 'auto_approved':
|
|
# Moderator submission was auto-approved
|
|
self.object = result['created_object']
|
|
|
|
if form.cleaned_data.get("latitude") and form.cleaned_data.get("longitude"):
|
|
# Create or update ParkLocation
|
|
park_location, created = ParkLocation.objects.get_or_create(
|
|
park=self.object,
|
|
defaults={
|
|
"street_address": form.cleaned_data.get("street_address", ""),
|
|
"city": form.cleaned_data.get("city", ""),
|
|
"state": form.cleaned_data.get("state", ""),
|
|
"country": form.cleaned_data.get("country", "USA"),
|
|
"postal_code": form.cleaned_data.get("postal_code", ""),
|
|
},
|
|
)
|
|
park_location.set_coordinates(
|
|
form.cleaned_data["latitude"],
|
|
form.cleaned_data["longitude"],
|
|
)
|
|
park_location.save()
|
|
|
|
photos = self.request.FILES.getlist("photos")
|
|
uploaded_count = 0
|
|
for photo_file in photos:
|
|
try:
|
|
ParkPhoto.objects.create(
|
|
image=photo_file,
|
|
uploaded_by=self.request.user,
|
|
park=self.object,
|
|
)
|
|
uploaded_count += 1
|
|
except Exception as e:
|
|
messages.error(
|
|
self.request,
|
|
f"Error uploading photo {photo_file.name}: {str(e)}",
|
|
)
|
|
|
|
messages.success(
|
|
self.request,
|
|
f"Successfully created {self.object.name}. "
|
|
f"Added {uploaded_count} photo(s).",
|
|
)
|
|
return HttpResponseRedirect(self.get_success_url())
|
|
|
|
elif result['status'] == 'queued':
|
|
# Regular user submission was queued
|
|
messages.success(
|
|
self.request,
|
|
"Your park submission has been sent for review. "
|
|
"You will be notified when it is approved.",
|
|
)
|
|
# Redirect to parks list since we don't have an object yet
|
|
return HttpResponseRedirect(reverse("parks:park_list"))
|
|
|
|
elif result['status'] == 'failed':
|
|
# Auto-approval failed
|
|
messages.error(
|
|
self.request,
|
|
f"Error creating park: {result['message']}. Please check your input and try again.",
|
|
)
|
|
return self.form_invalid(form)
|
|
|
|
# Fallback error case
|
|
messages.error(
|
|
self.request,
|
|
"An unexpected error occurred. Please try again.",
|
|
)
|
|
return self.form_invalid(form)
|
|
|
|
def get_success_url(self) -> str:
|
|
return reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug})
|
|
|
|
|
|
class ParkUpdateView(LoginRequiredMixin, UpdateView):
|
|
model = Park
|
|
form_class = ParkForm
|
|
template_name = "parks/park_form.html"
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
context["is_edit"] = True
|
|
return context
|
|
|
|
def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]:
|
|
data = cleaned_data.copy()
|
|
if data.get("owner"):
|
|
data["owner"] = data["owner"].id
|
|
if data.get("opening_date"):
|
|
data["opening_date"] = data["opening_date"].isoformat()
|
|
if data.get("closing_date"):
|
|
data["closing_date"] = data["closing_date"].isoformat()
|
|
decimal_fields = [
|
|
"latitude",
|
|
"longitude",
|
|
"size_acres",
|
|
"average_rating",
|
|
]
|
|
for field in decimal_fields:
|
|
if data.get(field):
|
|
data[field] = str(data[field])
|
|
return data
|
|
|
|
def normalize_coordinates(self, form: ParkForm) -> None:
|
|
if form.cleaned_data.get("latitude"):
|
|
lat = Decimal(str(form.cleaned_data["latitude"]))
|
|
form.cleaned_data["latitude"] = lat.quantize(
|
|
Decimal("0.000001"), rounding=ROUND_DOWN
|
|
)
|
|
if form.cleaned_data.get("longitude"):
|
|
lon = Decimal(str(form.cleaned_data["longitude"]))
|
|
form.cleaned_data["longitude"] = lon.quantize(
|
|
Decimal("0.000001"), rounding=ROUND_DOWN
|
|
)
|
|
|
|
def form_valid(self, form: ParkForm) -> HttpResponse: # noqa: C901
|
|
self.normalize_coordinates(form)
|
|
changes = self.prepare_changes_data(form.cleaned_data)
|
|
|
|
# Use the new queue routing service
|
|
result = ModerationService.create_edit_submission_with_queue(
|
|
content_object=self.object,
|
|
changes=changes,
|
|
submitter=self.request.user,
|
|
submission_type="EDIT",
|
|
reason=self.request.POST.get("reason", ""),
|
|
source=self.request.POST.get("source", ""),
|
|
)
|
|
|
|
if result['status'] == 'auto_approved':
|
|
# Moderator submission was auto-approved
|
|
# The object was already updated by the service
|
|
self.object = result['created_object']
|
|
|
|
location_data = {
|
|
"name": self.object.name,
|
|
"location_type": "park",
|
|
"latitude": form.cleaned_data.get("latitude"),
|
|
"longitude": form.cleaned_data.get("longitude"),
|
|
"street_address": form.cleaned_data.get("street_address", ""),
|
|
"city": form.cleaned_data.get("city", ""),
|
|
"state": form.cleaned_data.get("state", ""),
|
|
"country": form.cleaned_data.get("country", ""),
|
|
"postal_code": form.cleaned_data.get("postal_code", ""),
|
|
}
|
|
|
|
# Create or update ParkLocation
|
|
try:
|
|
park_location = self.object.location
|
|
# Update existing location
|
|
for key, value in location_data.items():
|
|
if key in ["latitude", "longitude"] and value:
|
|
continue # Handle coordinates separately
|
|
if hasattr(park_location, key):
|
|
setattr(park_location, key, value)
|
|
|
|
# Handle coordinates if provided
|
|
if "latitude" in location_data and "longitude" in location_data:
|
|
if location_data["latitude"] and location_data["longitude"]:
|
|
park_location.set_coordinates(
|
|
float(location_data["latitude"]),
|
|
float(location_data["longitude"]),
|
|
)
|
|
park_location.save()
|
|
except ParkLocation.DoesNotExist:
|
|
# Create new ParkLocation
|
|
coordinates_data = {}
|
|
if "latitude" in location_data and "longitude" in location_data:
|
|
if location_data["latitude"] and location_data["longitude"]:
|
|
coordinates_data = {
|
|
"latitude": float(location_data["latitude"]),
|
|
"longitude": float(location_data["longitude"]),
|
|
}
|
|
|
|
# Remove coordinate fields from location_data for creation
|
|
creation_data = {
|
|
k: v
|
|
for k, v in location_data.items()
|
|
if k not in ["latitude", "longitude"]
|
|
}
|
|
creation_data.setdefault("country", "USA")
|
|
|
|
park_location = ParkLocation.objects.create(
|
|
park=self.object, **creation_data
|
|
)
|
|
|
|
if coordinates_data:
|
|
park_location.set_coordinates(
|
|
coordinates_data["latitude"],
|
|
coordinates_data["longitude"],
|
|
)
|
|
park_location.save()
|
|
|
|
photos = self.request.FILES.getlist("photos")
|
|
uploaded_count = 0
|
|
for photo_file in photos:
|
|
try:
|
|
ParkPhoto.objects.create(
|
|
image=photo_file,
|
|
uploaded_by=self.request.user,
|
|
content_type=ContentType.objects.get_for_model(Park),
|
|
object_id=self.object.id,
|
|
)
|
|
uploaded_count += 1
|
|
except Exception as e:
|
|
messages.error(
|
|
self.request,
|
|
f"Error uploading photo {photo_file.name}: {str(e)}",
|
|
)
|
|
|
|
messages.success(
|
|
self.request,
|
|
f"Successfully updated {self.object.name}. "
|
|
f"Added {uploaded_count} new photo(s).",
|
|
)
|
|
return HttpResponseRedirect(self.get_success_url())
|
|
|
|
elif result['status'] == 'queued':
|
|
# Regular user submission was queued
|
|
messages.success(
|
|
self.request,
|
|
f"Your changes to {self.object.name} have been sent for review. "
|
|
"You will be notified when they are approved.",
|
|
)
|
|
return HttpResponseRedirect(
|
|
reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug})
|
|
)
|
|
|
|
elif result['status'] == 'failed':
|
|
# Auto-approval failed
|
|
messages.error(
|
|
self.request,
|
|
f"Error updating park: {result['message']}. Please check your input and try again.",
|
|
)
|
|
return self.form_invalid(form)
|
|
|
|
# Fallback error case
|
|
messages.error(
|
|
self.request,
|
|
"An unexpected error occurred. Please try again.",
|
|
)
|
|
return self.form_invalid(form)
|
|
|
|
def form_invalid(self, form: ParkForm) -> HttpResponse:
|
|
messages.error(self.request, REQUIRED_FIELDS_ERROR)
|
|
for field, errors in form.errors.items():
|
|
for error in errors:
|
|
messages.error(self.request, f"{field}: {error}")
|
|
return super().form_invalid(form)
|
|
|
|
def get_success_url(self) -> str:
|
|
return reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug})
|
|
|
|
|
|
class ParkDetailView(
|
|
SlugRedirectMixin,
|
|
EditSubmissionMixin,
|
|
PhotoSubmissionMixin,
|
|
HistoryMixin,
|
|
DetailView,
|
|
):
|
|
model = Park
|
|
template_name = "parks/park_detail.html"
|
|
context_object_name = "park"
|
|
|
|
def get_object(self, queryset: Optional[QuerySet[Park]] = None) -> Park:
|
|
if queryset is None:
|
|
queryset = self.get_queryset()
|
|
slug = self.kwargs.get(self.slug_url_kwarg)
|
|
if slug is None:
|
|
raise ObjectDoesNotExist("No slug provided")
|
|
park, _ = Park.get_by_slug(slug)
|
|
return park
|
|
|
|
def get_queryset(self) -> QuerySet[Park]:
|
|
return cast(
|
|
QuerySet[Park],
|
|
super()
|
|
.get_queryset()
|
|
.prefetch_related(
|
|
"rides", "rides__manufacturer", "photos", "areas", "location"
|
|
),
|
|
)
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
park = cast(Park, self.object)
|
|
context["areas"] = park.areas.all()
|
|
context["rides"] = park.rides.all().order_by("-status", "name")
|
|
|
|
if self.request.user.is_authenticated:
|
|
context["has_reviewed"] = Review.objects.filter(
|
|
user=self.request.user,
|
|
park=park,
|
|
).exists()
|
|
else:
|
|
context["has_reviewed"] = False
|
|
|
|
return context
|
|
|
|
def get_redirect_url_pattern(self) -> str:
|
|
return PARK_DETAIL_URL
|
|
|
|
|
|
class ParkAreaDetailView(
|
|
SlugRedirectMixin,
|
|
EditSubmissionMixin,
|
|
PhotoSubmissionMixin,
|
|
HistoryMixin,
|
|
DetailView,
|
|
):
|
|
model = ParkArea
|
|
template_name = "parks/area_detail.html"
|
|
context_object_name = "area"
|
|
slug_url_kwarg = "area_slug"
|
|
|
|
def get_object(self, queryset: Optional[QuerySet[ParkArea]] = None) -> ParkArea:
|
|
if queryset is None:
|
|
queryset = self.get_queryset()
|
|
park_slug = self.kwargs.get("park_slug")
|
|
area_slug = self.kwargs.get("area_slug")
|
|
if park_slug is None or area_slug is None:
|
|
raise ObjectDoesNotExist("Missing slug")
|
|
area, _ = ParkArea.get_by_slug(area_slug)
|
|
if area.park.slug != park_slug:
|
|
raise ObjectDoesNotExist("Park slug doesn't match")
|
|
return area
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
return context
|
|
|
|
def get_redirect_url_pattern(self) -> str:
|
|
return PARK_DETAIL_URL
|
|
|
|
def get_redirect_url_kwargs(self) -> dict[str, str]:
|
|
area = cast(ParkArea, self.object)
|
|
return {"park_slug": area.park.slug, "area_slug": area.slug}
|
|
|
|
|
|
class OperatorListView(ListView):
|
|
"""View for displaying a list of park operators"""
|
|
|
|
template_name = "operators/operator_list.html"
|
|
context_object_name = "operators"
|
|
paginate_by = 24
|
|
|
|
def get_queryset(self):
|
|
"""Get companies that are operators"""
|
|
from .models.companies import Company
|
|
from django.db.models import Count
|
|
|
|
return (
|
|
Company.objects.filter(roles__contains=["OPERATOR"])
|
|
.annotate(park_count=Count("operated_parks"))
|
|
.order_by("name")
|
|
)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
"""Add context data"""
|
|
context = super().get_context_data(**kwargs)
|
|
context["total_operators"] = self.get_queryset().count()
|
|
return context
|