from .querysets import get_base_park_queryset
from search.mixins import HTMXFilterableMixin
from reviews.models import Review
from location.models import Location
from media.models import Photo
from moderation.models import EditSubmission
from moderation.mixins import EditSubmissionMixin, PhotoSubmissionMixin, HistoryMixin
from core.views import SlugRedirectMixin
from .filters import ParkFilter
from .forms import ParkForm
from .models import Park, ParkArea
from django.http import HttpResponseRedirect, HttpResponse, HttpRequest, JsonResponse
from django.core.exceptions import ObjectDoesNotExist
from django.contrib import messages
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.mixins import LoginRequiredMixin
from django.db.models import Q, Count, QuerySet
from django.urls import reverse
from django.shortcuts import get_object_or_404, render
from decimal import InvalidOperation
from django.views.generic import DetailView, ListView, CreateView, UpdateView
import requests
from decimal import Decimal, ROUND_DOWN
from typing import Any, Optional, cast, Literal
# Constants
PARK_DETAIL_URL = "parks:park_detail"
PARK_LIST_ITEM_TEMPLATE = "parks/partials/park_list_item.html"
REQUIRED_FIELDS_ERROR = "Please correct the errors below. Required fields are marked with an asterisk (*)."
ALLOWED_ROLES = ["MODERATOR", "ADMIN", "SUPERUSER"]
ViewMode = Literal["grid", "list"]
def normalize_osm_result(result: dict) -> dict:
"""Normalize OpenStreetMap result to a consistent format with enhanced address details"""
from .location_utils import get_english_name, normalize_coordinate
# Get address details
address = result.get('address', {})
# Normalize coordinates
lat = normalize_coordinate(float(result.get('lat')), 9, 6)
lon = normalize_coordinate(float(result.get('lon')), 10, 6)
# Get English names where possible
name = ''
if 'namedetails' in result:
name = get_english_name(result['namedetails'])
# Build street address from available components
street_parts = []
if address.get('house_number'):
street_parts.append(address['house_number'])
if address.get('road') or address.get('street'):
street_parts.append(address.get('road') or address.get('street'))
elif address.get('pedestrian'):
street_parts.append(address['pedestrian'])
elif address.get('footway'):
street_parts.append(address['footway'])
# Handle additional address components
suburb = address.get('suburb', '')
district = address.get('district', '')
neighborhood = address.get('neighbourhood', '')
# Build city from available components
city = (address.get('city') or
address.get('town') or
address.get('village') or
address.get('municipality') or
'')
# Get detailed state/region information
state = (address.get('state') or
address.get('province') or
address.get('region') or
'')
# Get postal code with fallbacks
postal_code = (address.get('postcode') or
address.get('postal_code') or
'')
return {
'display_name': name or result.get('display_name', ''),
'lat': lat,
'lon': lon,
'street': ' '.join(street_parts).strip(),
'suburb': suburb,
'district': district,
'neighborhood': neighborhood,
'city': city,
'state': state,
'country': address.get('country', ''),
'postal_code': postal_code,
}
def get_view_mode(request: HttpRequest) -> ViewMode:
"""Get the current view mode from request, defaulting to grid"""
view_mode = request.GET.get('view_mode', 'grid')
return cast(ViewMode, 'list' if view_mode == 'list' else 'grid')
def add_park_button(request: HttpRequest) -> HttpResponse:
"""Return the add park button partial template"""
return render(request, "parks/partials/add_park_button.html")
def park_actions(request: HttpRequest, slug: str) -> HttpResponse:
"""Return the park actions partial template"""
park = get_object_or_404(Park, slug=slug)
return render(request, "parks/partials/park_actions.html", {"park": park})
def get_park_areas(request: HttpRequest) -> HttpResponse:
"""Return park areas as options for a select element"""
park_id = request.GET.get('park')
if not park_id:
return HttpResponse('')
try:
park = Park.objects.get(id=park_id)
areas = park.areas.all()
options = ['']
options.extend([
f''
for area in areas
])
return HttpResponse('\n'.join(options))
except Park.DoesNotExist:
return HttpResponse('')
def location_search(request: HttpRequest) -> JsonResponse:
"""Search for locations using OpenStreetMap Nominatim API"""
query = request.GET.get("q", "")
if not query:
return JsonResponse({"results": []})
response = requests.get(
"https://nominatim.openstreetmap.org/search",
params={
"q": query,
"format": "json",
"addressdetails": 1,
"namedetails": 1,
"accept-language": "en",
"limit": 10,
},
headers={"User-Agent": "ThrillWiki/1.0"},
timeout=60
)
if response.status_code == 200:
results = response.json()
normalized_results = [normalize_osm_result(
result) for result in results]
valid_results = [
r for r in normalized_results
if r["lat"] is not None and r["lon"] is not None
]
return JsonResponse({"results": valid_results})
return JsonResponse({"results": []})
def reverse_geocode(request: HttpRequest) -> JsonResponse:
"""Reverse geocode coordinates using OpenStreetMap Nominatim API"""
try:
lat = Decimal(request.GET.get("lat", ""))
lon = Decimal(request.GET.get("lon", ""))
except (TypeError, ValueError, InvalidOperation):
return JsonResponse({"error": "Invalid coordinates"}, status=400)
if not lat or not lon:
return JsonResponse({"error": "Missing coordinates"}, status=400)
lat = lat.quantize(Decimal("0.000001"), rounding=ROUND_DOWN)
lon = lon.quantize(Decimal("0.000001"), rounding=ROUND_DOWN)
if lat < -90 or lat > 90:
return JsonResponse({"error": "Latitude must be between -90 and 90"}, status=400)
if lon < -180 or lon > 180:
return JsonResponse({"error": "Longitude must be between -180 and 180"}, status=400)
response = requests.get(
"https://nominatim.openstreetmap.org/reverse",
params={
"lat": str(lat),
"lon": str(lon),
"format": "json",
"addressdetails": 1,
"namedetails": 1,
"accept-language": "en",
},
headers={"User-Agent": "ThrillWiki/1.0"},
timeout=60
)
if response.status_code == 200:
result = response.json()
normalized_result = normalize_osm_result(result)
if normalized_result["lat"] is None or normalized_result["lon"] is None:
return JsonResponse({"error": "Invalid coordinates"}, status=400)
return JsonResponse(normalized_result)
return JsonResponse({"error": "Geocoding failed"}, status=500)
class ParkListView(HTMXFilterableMixin, ListView):
model = Park
template_name = "parks/park_list.html"
context_object_name = "parks"
filter_class = ParkFilter
paginate_by = 20
def get_template_names(self) -> list[str]:
"""Return park_list_item.html for HTMX requests"""
if self.request.htmx:
return ["parks/partials/park_list_item.html"]
return [self.template_name]
def get_view_mode(self) -> ViewMode:
"""Get the current view mode (grid or list)"""
return get_view_mode(self.request)
def get_queryset(self) -> QuerySet[Park]:
"""Get base queryset with annotations and apply filters"""
try:
queryset = get_base_park_queryset()
except Exception as e:
messages.error(self.request, f"Error loading parks: {str(e)}")
queryset = self.model.objects.none()
# Always initialize filterset, even if queryset failed
self.filterset = self.get_filter_class()(self.request.GET, queryset=queryset)
return self.filterset.qs
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
"""Add view_mode and other context data"""
try:
# Initialize filterset even if queryset fails
if not hasattr(self, 'filterset'):
self.filterset = self.get_filter_class()(
self.request.GET,
queryset=self.model.objects.none()
)
context = super().get_context_data(**kwargs)
context.update({
'view_mode': self.get_view_mode(),
'is_search': bool(self.request.GET.get('search')),
'search_query': self.request.GET.get('search', '')
})
return context
except Exception as e:
messages.error(self.request, f"Error applying filters: {str(e)}")
# Ensure filterset exists in error case
if not hasattr(self, 'filterset'):
self.filterset = self.get_filter_class()(
self.request.GET,
queryset=self.model.objects.none()
)
return {
'filter': self.filterset,
'error': "Unable to apply filters. Please try adjusting your criteria.",
'view_mode': self.get_view_mode(),
'is_search': bool(self.request.GET.get('search')),
'search_query': self.request.GET.get('search', '')
}
def search_parks(request: HttpRequest) -> HttpResponse:
"""Search parks and return results using park_list_item.html"""
try:
search_query = request.GET.get('search', '').strip()
if not search_query:
return HttpResponse('')
# Get current view mode from request
current_view_mode = request.GET.get('view_mode', 'grid')
park_filter = ParkFilter({
'search': search_query
}, queryset=get_base_park_queryset())
parks = park_filter.qs
if request.GET.get('quick_search'):
parks = parks[:8] # Limit quick search results
response = render(
request,
PARK_LIST_ITEM_TEMPLATE,
{
"parks": parks,
"view_mode": current_view_mode,
"search_query": search_query,
"is_search": True
}
)
response['HX-Trigger'] = 'searchComplete'
return response
except Exception as e:
response = render(
request,
PARK_LIST_ITEM_TEMPLATE,
{
"parks": [],
"error": f"Error performing search: {str(e)}",
"is_search": True
}
)
response['HX-Trigger'] = 'searchError'
return response
class ParkCreateView(LoginRequiredMixin, CreateView):
model = Park
form_class = ParkForm
template_name = "parks/park_form.html"
def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]:
data = cleaned_data.copy()
if data.get("owner"):
data["owner"] = data["owner"].id
if data.get("opening_date"):
data["opening_date"] = data["opening_date"].isoformat()
if data.get("closing_date"):
data["closing_date"] = data["closing_date"].isoformat()
decimal_fields = ["latitude", "longitude",
"size_acres", "average_rating"]
for field in decimal_fields:
if data.get(field):
data[field] = str(data[field])
return data
def normalize_coordinates(self, form: ParkForm) -> None:
if form.cleaned_data.get("latitude"):
lat = Decimal(str(form.cleaned_data["latitude"]))
form.cleaned_data["latitude"] = lat.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
if form.cleaned_data.get("longitude"):
lon = Decimal(str(form.cleaned_data["longitude"]))
form.cleaned_data["longitude"] = lon.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
def form_valid(self, form: ParkForm) -> HttpResponse:
self.normalize_coordinates(form)
changes = self.prepare_changes_data(form.cleaned_data)
submission = EditSubmission.objects.create(
user=self.request.user,
content_type=ContentType.objects.get_for_model(Park),
submission_type="CREATE",
changes=changes,
reason=self.request.POST.get("reason", ""),
source=self.request.POST.get("source", ""),
)
if hasattr(self.request.user, "role") and getattr(
self.request.user, "role", None
) in ALLOWED_ROLES:
try:
self.object = form.save()
submission.object_id = self.object.id
submission.status = "APPROVED"
submission.handled_by = self.request.user
submission.save()
if form.cleaned_data.get("latitude") and form.cleaned_data.get(
"longitude"
):
Location.objects.create(
content_type=ContentType.objects.get_for_model(Park),
object_id=self.object.id,
name=self.object.name,
location_type="park",
latitude=form.cleaned_data["latitude"],
longitude=form.cleaned_data["longitude"],
street_address=form.cleaned_data.get(
"street_address", ""),
city=form.cleaned_data.get("city", ""),
state=form.cleaned_data.get("state", ""),
country=form.cleaned_data.get("country", ""),
postal_code=form.cleaned_data.get("postal_code", ""),
)
photos = self.request.FILES.getlist("photos")
uploaded_count = 0
for photo_file in photos:
try:
Photo.objects.create(
image=photo_file,
uploaded_by=self.request.user,
content_type=ContentType.objects.get_for_model(
Park),
object_id=self.object.id,
)
uploaded_count += 1
except Exception as e:
messages.error(
self.request,
f"Error uploading photo {photo_file.name}: {str(e)}",
)
messages.success(
self.request,
f"Successfully created {self.object.name}. "
f"Added {uploaded_count} photo(s).",
)
return HttpResponseRedirect(self.get_success_url())
except Exception as e:
messages.error(
self.request,
f"Error creating park: {str(e)}. Please check your input and try again.",
)
return self.form_invalid(form)
messages.success(
self.request,
"Your park submission has been sent for review. "
"You will be notified when it is approved."
)
for field, errors in form.errors.items():
for error in errors:
messages.error(self.request, f"{field}: {error}")
return super().form_invalid(form)
def get_success_url(self) -> str:
return reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug})
class ParkUpdateView(LoginRequiredMixin, UpdateView):
model = Park
form_class = ParkForm
template_name = "parks/park_form.html"
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
context["is_edit"] = True
return context
def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]:
data = cleaned_data.copy()
if data.get("owner"):
data["owner"] = data["owner"].id
if data.get("opening_date"):
data["opening_date"] = data["opening_date"].isoformat()
if data.get("closing_date"):
data["closing_date"] = data["closing_date"].isoformat()
decimal_fields = ["latitude", "longitude",
"size_acres", "average_rating"]
for field in decimal_fields:
if data.get(field):
data[field] = str(data[field])
return data
def normalize_coordinates(self, form: ParkForm) -> None:
if form.cleaned_data.get("latitude"):
lat = Decimal(str(form.cleaned_data["latitude"]))
form.cleaned_data["latitude"] = lat.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
if form.cleaned_data.get("longitude"):
lon = Decimal(str(form.cleaned_data["longitude"]))
form.cleaned_data["longitude"] = lon.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
def form_valid(self, form: ParkForm) -> HttpResponse:
self.normalize_coordinates(form)
changes = self.prepare_changes_data(form.cleaned_data)
submission = EditSubmission.objects.create(
user=self.request.user,
content_type=ContentType.objects.get_for_model(Park),
object_id=self.object.id,
submission_type="EDIT",
changes=changes,
reason=self.request.POST.get("reason", ""),
source=self.request.POST.get("source", ""),
)
if hasattr(self.request.user, "role") and getattr(
self.request.user, "role", None
) in ALLOWED_ROLES:
try:
self.object = form.save()
submission.status = "APPROVED"
submission.handled_by = self.request.user
submission.save()
location_data = {
"name": self.object.name,
"location_type": "park",
"latitude": form.cleaned_data.get("latitude"),
"longitude": form.cleaned_data.get("longitude"),
"street_address": form.cleaned_data.get("street_address", ""),
"city": form.cleaned_data.get("city", ""),
"state": form.cleaned_data.get("state", ""),
"country": form.cleaned_data.get("country", ""),
"postal_code": form.cleaned_data.get("postal_code", ""),
}
if self.object.location.exists():
location = self.object.location.first()
for key, value in location_data.items():
setattr(location, key, value)
location.save()
else:
Location.objects.create(
content_type=ContentType.objects.get_for_model(Park),
object_id=self.object.id,
**location_data,
)
photos = self.request.FILES.getlist("photos")
uploaded_count = 0
for photo_file in photos:
try:
Photo.objects.create(
image=photo_file,
uploaded_by=self.request.user,
content_type=ContentType.objects.get_for_model(
Park),
object_id=self.object.id,
)
uploaded_count += 1
except Exception as e:
messages.error(
self.request,
f"Error uploading photo {photo_file.name}: {str(e)}",
)
messages.success(
self.request,
f"Successfully updated {self.object.name}. "
f"Added {uploaded_count} new photo(s).",
)
return HttpResponseRedirect(self.get_success_url())
except Exception as e:
messages.error(
self.request,
f"Error updating park: {str(e)}. Please check your input and try again.",
)
return self.form_invalid(form)
messages.success(
self.request,
f"Your changes to {self.object.name} have been sent for review. "
"You will be notified when they are approved.",
)
return HttpResponseRedirect(
reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug})
)
def form_invalid(self, form: ParkForm) -> HttpResponse:
messages.error(
self.request,
REQUIRED_FIELDS_ERROR
)
for field, errors in form.errors.items():
for error in errors:
messages.error(self.request, f"{field}: {error}")
return super().form_invalid(form)
def get_success_url(self) -> str:
return reverse(PARK_DETAIL_URL, kwargs={"slug": self.object.slug})
class ParkDetailView(
SlugRedirectMixin,
EditSubmissionMixin,
PhotoSubmissionMixin,
HistoryMixin,
DetailView
):
model = Park
template_name = "parks/park_detail.html"
context_object_name = "park"
def get_object(self, queryset: Optional[QuerySet[Park]] = None) -> Park:
if queryset is None:
queryset = self.get_queryset()
slug = self.kwargs.get(self.slug_url_kwarg)
if slug is None:
raise ObjectDoesNotExist("No slug provided")
park, _ = Park.get_by_slug(slug)
return park
def get_queryset(self) -> QuerySet[Park]:
return cast(
QuerySet[Park],
super()
.get_queryset()
.prefetch_related(
"rides",
"rides__manufacturer",
"photos",
"areas",
"location"
),
)
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
park = cast(Park, self.object)
context["areas"] = park.areas.all()
context["rides"] = park.rides.all().order_by("-status", "name")
if self.request.user.is_authenticated:
context["has_reviewed"] = Review.objects.filter(
user=self.request.user,
content_type=ContentType.objects.get_for_model(Park),
object_id=park.id,
).exists()
else:
context["has_reviewed"] = False
return context
def get_redirect_url_pattern(self) -> str:
return PARK_DETAIL_URL
class ParkAreaDetailView(
SlugRedirectMixin,
EditSubmissionMixin,
PhotoSubmissionMixin,
HistoryMixin,
DetailView
):
model = ParkArea
template_name = "parks/area_detail.html"
context_object_name = "area"
slug_url_kwarg = "area_slug"
def get_object(self, queryset: Optional[QuerySet[ParkArea]] = None) -> ParkArea:
if queryset is None:
queryset = self.get_queryset()
park_slug = self.kwargs.get("park_slug")
area_slug = self.kwargs.get("area_slug")
if park_slug is None or area_slug is None:
raise ObjectDoesNotExist("Missing slug")
area, _ = ParkArea.get_by_slug(area_slug)
if area.park.slug != park_slug:
raise ObjectDoesNotExist("Park slug doesn't match")
return area
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
return context
def get_redirect_url_pattern(self) -> str:
return PARK_DETAIL_URL
def get_redirect_url_kwargs(self) -> dict[str, str]:
area = cast(ParkArea, self.object)
return {"park_slug": area.park.slug, "area_slug": area.slug}