mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-20 06:51:08 -05:00
549 lines
21 KiB
Python
549 lines
21 KiB
Python
from decimal import Decimal, ROUND_DOWN, InvalidOperation
|
|
from typing import Any, Optional, cast, Type
|
|
from django.views.generic import DetailView, ListView, CreateView, UpdateView
|
|
from django.shortcuts import get_object_or_404, render
|
|
from django.core.serializers.json import DjangoJSONEncoder
|
|
from django.urls import reverse
|
|
from django.db.models import Q, Avg, Count, QuerySet, Model
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django.contrib.auth.mixins import LoginRequiredMixin
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib import messages
|
|
from django.http import JsonResponse, HttpResponseRedirect, HttpResponse, HttpRequest
|
|
import requests
|
|
from .models import Park, ParkArea
|
|
from .forms import ParkForm
|
|
from .location_utils import normalize_coordinate, normalize_osm_result
|
|
from core.views import SlugRedirectMixin
|
|
from moderation.mixins import EditSubmissionMixin, PhotoSubmissionMixin, HistoryMixin
|
|
from moderation.models import EditSubmission
|
|
from media.models import Photo
|
|
from location.models import Location
|
|
from reviews.models import Review
|
|
from analytics.models import PageView
|
|
|
|
|
|
def park_actions(request: HttpRequest, slug: str) -> HttpResponse:
|
|
"""Return the park actions partial template"""
|
|
park = get_object_or_404(Park, slug=slug)
|
|
return render(request, "parks/partials/park_actions.html", {"park": park})
|
|
|
|
|
|
def get_park_areas(request: HttpRequest) -> HttpResponse:
|
|
"""Return park areas as options for a select element"""
|
|
park_id = request.GET.get('park')
|
|
if not park_id:
|
|
return HttpResponse('<option value="">Select a park first</option>')
|
|
|
|
try:
|
|
park = Park.objects.get(id=park_id)
|
|
areas = park.areas.all()
|
|
options = ['<option value="">No specific area</option>']
|
|
options.extend([
|
|
f'<option value="{area.id}">{area.name}</option>'
|
|
for area in areas
|
|
])
|
|
return HttpResponse('\n'.join(options))
|
|
except Park.DoesNotExist:
|
|
return HttpResponse('<option value="">Invalid park selected</option>')
|
|
|
|
|
|
def search_parks(request: HttpRequest) -> HttpResponse:
|
|
"""Search parks and return results for HTMX"""
|
|
query = request.GET.get('q', '').strip()
|
|
|
|
# If no query, show first 10 parks
|
|
if not query:
|
|
parks = Park.objects.all().order_by('name')[:10]
|
|
else:
|
|
parks = Park.objects.filter(name__icontains=query).order_by('name')[:10]
|
|
|
|
return render(request, "parks/partials/park_search_results.html", {"parks": parks})
|
|
|
|
|
|
def location_search(request: HttpRequest) -> JsonResponse:
|
|
"""Search for locations using OpenStreetMap Nominatim API"""
|
|
query = request.GET.get("q", "")
|
|
if not query:
|
|
return JsonResponse({"results": []})
|
|
|
|
# Call Nominatim API
|
|
response = requests.get(
|
|
"https://nominatim.openstreetmap.org/search",
|
|
params={
|
|
"q": query,
|
|
"format": "json",
|
|
"addressdetails": 1,
|
|
"namedetails": 1,
|
|
"accept-language": "en",
|
|
"limit": 10,
|
|
},
|
|
headers={"User-Agent": "ThrillWiki/1.0"},
|
|
timeout=60)
|
|
|
|
if response.status_code == 200:
|
|
results = response.json()
|
|
normalized_results = [normalize_osm_result(result) for result in results]
|
|
valid_results = [
|
|
r
|
|
for r in normalized_results
|
|
if r["lat"] is not None and r["lon"] is not None
|
|
]
|
|
return JsonResponse({"results": valid_results})
|
|
|
|
return JsonResponse({"results": []})
|
|
|
|
|
|
def reverse_geocode(request: HttpRequest) -> JsonResponse:
|
|
"""Reverse geocode coordinates using OpenStreetMap Nominatim API"""
|
|
try:
|
|
lat = Decimal(request.GET.get("lat", ""))
|
|
lon = Decimal(request.GET.get("lon", ""))
|
|
except (TypeError, ValueError, InvalidOperation):
|
|
return JsonResponse({"error": "Invalid coordinates"}, status=400)
|
|
|
|
if not lat or not lon:
|
|
return JsonResponse({"error": "Missing coordinates"}, status=400)
|
|
|
|
lat = lat.quantize(Decimal("0.000001"), rounding=ROUND_DOWN)
|
|
lon = lon.quantize(Decimal("0.000001"), rounding=ROUND_DOWN)
|
|
|
|
if lat < -90 or lat > 90:
|
|
return JsonResponse(
|
|
{"error": "Latitude must be between -90 and 90"}, status=400
|
|
)
|
|
if lon < -180 or lon > 180:
|
|
return JsonResponse(
|
|
{"error": "Longitude must be between -180 and 180"}, status=400
|
|
)
|
|
|
|
response = requests.get(
|
|
"https://nominatim.openstreetmap.org/reverse",
|
|
params={
|
|
"lat": str(lat),
|
|
"lon": str(lon),
|
|
"format": "json",
|
|
"addressdetails": 1,
|
|
"namedetails": 1,
|
|
"accept-language": "en",
|
|
},
|
|
headers={"User-Agent": "ThrillWiki/1.0"},
|
|
timeout=60)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
normalized_result = normalize_osm_result(result)
|
|
if normalized_result["lat"] is None or normalized_result["lon"] is None:
|
|
return JsonResponse({"error": "Invalid coordinates"}, status=400)
|
|
return JsonResponse(normalized_result)
|
|
|
|
return JsonResponse({"error": "Geocoding failed"}, status=500)
|
|
|
|
|
|
def add_park_button(request: HttpRequest) -> HttpResponse:
|
|
"""Return the add park button partial template"""
|
|
return render(request, "parks/partials/add_park_button.html")
|
|
|
|
|
|
class ParkListView(ListView):
|
|
model = Park
|
|
template_name = "parks/park_list.html"
|
|
context_object_name = "parks"
|
|
|
|
def get_queryset(self) -> QuerySet[Park]:
|
|
queryset = Park.objects.select_related("owner").prefetch_related(
|
|
"photos", "location"
|
|
)
|
|
|
|
search = self.request.GET.get("search", "").strip()
|
|
country = self.request.GET.get("country", "").strip()
|
|
region = self.request.GET.get("region", "").strip()
|
|
city = self.request.GET.get("city", "").strip()
|
|
statuses = self.request.GET.getlist("status")
|
|
|
|
if search:
|
|
queryset = queryset.filter(
|
|
Q(name__icontains=search)
|
|
| Q(location__city__icontains=search)
|
|
| Q(location__state__icontains=search)
|
|
| Q(location__country__icontains=search)
|
|
)
|
|
|
|
if country:
|
|
queryset = queryset.filter(location__country__icontains=country)
|
|
|
|
if region:
|
|
queryset = queryset.filter(location__state__icontains=region)
|
|
|
|
if city:
|
|
queryset = queryset.filter(location__city__icontains=city)
|
|
|
|
if statuses:
|
|
queryset = queryset.filter(status__in=statuses)
|
|
|
|
queryset = queryset.annotate(
|
|
total_rides=Count("rides"),
|
|
total_coasters=Count("rides", filter=Q(rides__category="RC")),
|
|
)
|
|
|
|
return queryset.distinct()
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
context["current_filters"] = {
|
|
"search": self.request.GET.get("search", ""),
|
|
"country": self.request.GET.get("country", ""),
|
|
"region": self.request.GET.get("region", ""),
|
|
"city": self.request.GET.get("city", ""),
|
|
"statuses": self.request.GET.getlist("status"),
|
|
}
|
|
return context
|
|
|
|
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
if hasattr(request, "htmx") and getattr(request, "htmx", False):
|
|
self.template_name = "parks/partials/park_list.html"
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
|
|
class ParkDetailView(
|
|
SlugRedirectMixin,
|
|
EditSubmissionMixin,
|
|
PhotoSubmissionMixin,
|
|
HistoryMixin,
|
|
DetailView,
|
|
):
|
|
model = Park
|
|
template_name = "parks/park_detail.html"
|
|
context_object_name = "park"
|
|
|
|
def get_object(self, queryset: Optional[QuerySet[Park]] = None) -> Park:
|
|
if queryset is None:
|
|
queryset = self.get_queryset()
|
|
slug = self.kwargs.get(self.slug_url_kwarg)
|
|
if slug is None:
|
|
raise ObjectDoesNotExist("No slug provided")
|
|
park, _ = Park.get_by_slug(slug)
|
|
return park
|
|
|
|
def get_queryset(self) -> QuerySet[Park]:
|
|
return cast(
|
|
QuerySet[Park],
|
|
super()
|
|
.get_queryset()
|
|
.prefetch_related(
|
|
"rides", "rides__manufacturer", "photos", "areas", "location"
|
|
),
|
|
)
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
park = cast(Park, self.object)
|
|
context["areas"] = park.areas.all()
|
|
context["rides"] = park.rides.all().order_by("-status", "name")
|
|
|
|
if self.request.user.is_authenticated:
|
|
context["has_reviewed"] = Review.objects.filter(
|
|
user=self.request.user,
|
|
content_type=ContentType.objects.get_for_model(Park),
|
|
object_id=park.id,
|
|
).exists()
|
|
else:
|
|
context["has_reviewed"] = False
|
|
|
|
return context
|
|
|
|
def get_redirect_url_pattern(self) -> str:
|
|
return "parks:park_detail"
|
|
|
|
|
|
class ParkCreateView(LoginRequiredMixin, CreateView):
|
|
model = Park
|
|
form_class = ParkForm
|
|
template_name = "parks/park_form.html"
|
|
|
|
def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]:
|
|
data = cleaned_data.copy()
|
|
if data.get("owner"):
|
|
data["owner"] = data["owner"].id
|
|
if data.get("opening_date"):
|
|
data["opening_date"] = data["opening_date"].isoformat()
|
|
if data.get("closing_date"):
|
|
data["closing_date"] = data["closing_date"].isoformat()
|
|
decimal_fields = ["latitude", "longitude", "size_acres", "average_rating"]
|
|
for field in decimal_fields:
|
|
if data.get(field):
|
|
data[field] = str(data[field])
|
|
return data
|
|
|
|
def normalize_coordinates(self, form: ParkForm) -> None:
|
|
if form.cleaned_data.get("latitude"):
|
|
lat = Decimal(str(form.cleaned_data["latitude"]))
|
|
form.cleaned_data["latitude"] = lat.quantize(
|
|
Decimal("0.000001"), rounding=ROUND_DOWN
|
|
)
|
|
if form.cleaned_data.get("longitude"):
|
|
lon = Decimal(str(form.cleaned_data["longitude"]))
|
|
form.cleaned_data["longitude"] = lon.quantize(
|
|
Decimal("0.000001"), rounding=ROUND_DOWN
|
|
)
|
|
|
|
def form_valid(self, form: ParkForm) -> HttpResponse:
|
|
self.normalize_coordinates(form)
|
|
changes = self.prepare_changes_data(form.cleaned_data)
|
|
|
|
submission = EditSubmission.objects.create(
|
|
user=self.request.user,
|
|
content_type=ContentType.objects.get_for_model(Park),
|
|
submission_type="CREATE",
|
|
changes=changes,
|
|
reason=self.request.POST.get("reason", ""),
|
|
source=self.request.POST.get("source", ""),
|
|
)
|
|
|
|
if hasattr(self.request.user, "role") and getattr(
|
|
self.request.user, "role", None
|
|
) in ["MODERATOR", "ADMIN", "SUPERUSER"]:
|
|
try:
|
|
self.object = form.save()
|
|
submission.object_id = self.object.id
|
|
submission.status = "APPROVED"
|
|
submission.handled_by = self.request.user
|
|
submission.save()
|
|
|
|
if form.cleaned_data.get("latitude") and form.cleaned_data.get(
|
|
"longitude"
|
|
):
|
|
Location.objects.create(
|
|
content_type=ContentType.objects.get_for_model(Park),
|
|
object_id=self.object.id,
|
|
name=self.object.name,
|
|
location_type="park",
|
|
latitude=form.cleaned_data["latitude"],
|
|
longitude=form.cleaned_data["longitude"],
|
|
street_address=form.cleaned_data.get("street_address", ""),
|
|
city=form.cleaned_data.get("city", ""),
|
|
state=form.cleaned_data.get("state", ""),
|
|
country=form.cleaned_data.get("country", ""),
|
|
postal_code=form.cleaned_data.get("postal_code", ""),
|
|
)
|
|
|
|
photos = self.request.FILES.getlist("photos")
|
|
for photo_file in photos:
|
|
try:
|
|
Photo.objects.create(
|
|
image=photo_file,
|
|
uploaded_by=self.request.user,
|
|
content_type=ContentType.objects.get_for_model(Park),
|
|
object_id=self.object.id,
|
|
)
|
|
except Exception as e:
|
|
messages.error(
|
|
self.request,
|
|
f"Error uploading photo {photo_file.name}: {str(e)}",
|
|
)
|
|
|
|
messages.success(
|
|
self.request,
|
|
f"Successfully created {self.object.name}. "
|
|
f"Added {len(photos)} photo(s).",
|
|
)
|
|
return HttpResponseRedirect(self.get_success_url())
|
|
except Exception as e:
|
|
messages.error(
|
|
self.request,
|
|
f"Error creating park: {str(e)}. Please check your input and try again.",
|
|
)
|
|
return self.form_invalid(form)
|
|
|
|
messages.success(
|
|
self.request,
|
|
"Your park submission has been sent for review. "
|
|
"You will be notified when it is approved.",
|
|
)
|
|
return HttpResponseRedirect(reverse("parks:park_list"))
|
|
|
|
def form_invalid(self, form: ParkForm) -> HttpResponse:
|
|
messages.error(
|
|
self.request,
|
|
"Please correct the errors below. Required fields are marked with an asterisk (*).",
|
|
)
|
|
for field, errors in form.errors.items():
|
|
for error in errors:
|
|
messages.error(self.request, f"{field}: {error}")
|
|
return super().form_invalid(form)
|
|
|
|
def get_success_url(self) -> str:
|
|
return reverse("parks:park_detail", kwargs={"slug": self.object.slug})
|
|
|
|
|
|
class ParkUpdateView(LoginRequiredMixin, UpdateView):
|
|
model = Park
|
|
form_class = ParkForm
|
|
template_name = "parks/park_form.html"
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
context["is_edit"] = True
|
|
return context
|
|
|
|
def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]:
|
|
data = cleaned_data.copy()
|
|
if data.get("owner"):
|
|
data["owner"] = data["owner"].id
|
|
if data.get("opening_date"):
|
|
data["opening_date"] = data["opening_date"].isoformat()
|
|
if data.get("closing_date"):
|
|
data["closing_date"] = data["closing_date"].isoformat()
|
|
decimal_fields = ["latitude", "longitude", "size_acres", "average_rating"]
|
|
for field in decimal_fields:
|
|
if data.get(field):
|
|
data[field] = str(data[field])
|
|
return data
|
|
|
|
def normalize_coordinates(self, form: ParkForm) -> None:
|
|
if form.cleaned_data.get("latitude"):
|
|
lat = Decimal(str(form.cleaned_data["latitude"]))
|
|
form.cleaned_data["latitude"] = lat.quantize(
|
|
Decimal("0.000001"), rounding=ROUND_DOWN
|
|
)
|
|
if form.cleaned_data.get("longitude"):
|
|
lon = Decimal(str(form.cleaned_data["longitude"]))
|
|
form.cleaned_data["longitude"] = lon.quantize(
|
|
Decimal("0.000001"), rounding=ROUND_DOWN
|
|
)
|
|
|
|
def form_valid(self, form: ParkForm) -> HttpResponse:
|
|
self.normalize_coordinates(form)
|
|
changes = self.prepare_changes_data(form.cleaned_data)
|
|
|
|
submission = EditSubmission.objects.create(
|
|
user=self.request.user,
|
|
content_type=ContentType.objects.get_for_model(Park),
|
|
object_id=self.object.id,
|
|
submission_type="EDIT",
|
|
changes=changes,
|
|
reason=self.request.POST.get("reason", ""),
|
|
source=self.request.POST.get("source", ""),
|
|
)
|
|
|
|
if hasattr(self.request.user, "role") and getattr(
|
|
self.request.user, "role", None
|
|
) in ["MODERATOR", "ADMIN", "SUPERUSER"]:
|
|
try:
|
|
self.object = form.save()
|
|
submission.status = "APPROVED"
|
|
submission.handled_by = self.request.user
|
|
submission.save()
|
|
|
|
location_data = {
|
|
"name": self.object.name,
|
|
"location_type": "park",
|
|
"latitude": form.cleaned_data.get("latitude"),
|
|
"longitude": form.cleaned_data.get("longitude"),
|
|
"street_address": form.cleaned_data.get("street_address", ""),
|
|
"city": form.cleaned_data.get("city", ""),
|
|
"state": form.cleaned_data.get("state", ""),
|
|
"country": form.cleaned_data.get("country", ""),
|
|
"postal_code": form.cleaned_data.get("postal_code", ""),
|
|
}
|
|
|
|
if self.object.location.exists():
|
|
location = self.object.location.first()
|
|
for key, value in location_data.items():
|
|
setattr(location, key, value)
|
|
location.save()
|
|
else:
|
|
Location.objects.create(
|
|
content_type=ContentType.objects.get_for_model(Park),
|
|
object_id=self.object.id,
|
|
**location_data,
|
|
)
|
|
|
|
photos = self.request.FILES.getlist("photos")
|
|
uploaded_count = 0
|
|
for photo_file in photos:
|
|
try:
|
|
Photo.objects.create(
|
|
image=photo_file,
|
|
uploaded_by=self.request.user,
|
|
content_type=ContentType.objects.get_for_model(Park),
|
|
object_id=self.object.id,
|
|
)
|
|
uploaded_count += 1
|
|
except Exception as e:
|
|
messages.error(
|
|
self.request,
|
|
f"Error uploading photo {photo_file.name}: {str(e)}",
|
|
)
|
|
|
|
messages.success(
|
|
self.request,
|
|
f"Successfully updated {self.object.name}. "
|
|
f"Added {uploaded_count} new photo(s).",
|
|
)
|
|
return HttpResponseRedirect(self.get_success_url())
|
|
except Exception as e:
|
|
messages.error(
|
|
self.request,
|
|
f"Error updating park: {str(e)}. Please check your input and try again.",
|
|
)
|
|
return self.form_invalid(form)
|
|
|
|
messages.success(
|
|
self.request,
|
|
f"Your changes to {self.object.name} have been sent for review. "
|
|
"You will be notified when they are approved.",
|
|
)
|
|
return HttpResponseRedirect(
|
|
reverse("parks:park_detail", kwargs={"slug": self.object.slug})
|
|
)
|
|
|
|
def form_invalid(self, form: ParkForm) -> HttpResponse:
|
|
messages.error(
|
|
self.request,
|
|
"Please correct the errors below. Required fields are marked with an asterisk (*).",
|
|
)
|
|
for field, errors in form.errors.items():
|
|
for error in errors:
|
|
messages.error(self.request, f"{field}: {error}")
|
|
return super().form_invalid(form)
|
|
|
|
def get_success_url(self) -> str:
|
|
return reverse("parks:park_detail", kwargs={"slug": self.object.slug})
|
|
|
|
|
|
class ParkAreaDetailView(
|
|
SlugRedirectMixin,
|
|
EditSubmissionMixin,
|
|
PhotoSubmissionMixin,
|
|
HistoryMixin,
|
|
DetailView,
|
|
):
|
|
model = ParkArea
|
|
template_name = "parks/area_detail.html"
|
|
context_object_name = "area"
|
|
slug_url_kwarg = "area_slug"
|
|
|
|
def get_object(self, queryset: Optional[QuerySet[ParkArea]] = None) -> ParkArea:
|
|
if queryset is None:
|
|
queryset = self.get_queryset()
|
|
park_slug = self.kwargs.get("park_slug")
|
|
area_slug = self.kwargs.get("area_slug")
|
|
if park_slug is None or area_slug is None:
|
|
raise ObjectDoesNotExist("Missing slug")
|
|
area, _ = ParkArea.get_by_slug(area_slug)
|
|
if area.park.slug != park_slug:
|
|
raise ObjectDoesNotExist("Park slug doesn't match")
|
|
return area
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
return context
|
|
|
|
def get_redirect_url_pattern(self) -> str:
|
|
return "parks:park_detail"
|
|
|
|
def get_redirect_url_kwargs(self) -> dict[str, str]:
|
|
area = cast(ParkArea, self.object)
|
|
return {"park_slug": area.park.slug, "area_slug": area.slug}
|