Implement ride count fields with real-time annotations; update filters and templates for consistency and accuracy

This commit is contained in:
pacnpal
2025-02-13 16:44:30 -05:00
parent 9d6f6dab2c
commit c19aaf2f4b
10 changed files with 988 additions and 367 deletions

View File

@@ -1,36 +1,55 @@
from decimal import Decimal, ROUND_DOWN, InvalidOperation
from typing import Any, Optional, cast, Type
from decimal import Decimal, ROUND_DOWN
from typing import Any, Optional, cast, Literal
from django.views.generic import DetailView, ListView, CreateView, UpdateView
from django.shortcuts import get_object_or_404, render
from django.core.serializers.json import DjangoJSONEncoder
from django.urls import reverse
from django.db.models import Q, Avg, Count, QuerySet, Model
from search.mixins import HTMXFilterableMixin
from .filters import ParkFilter
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q, Count, QuerySet
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.contenttypes.models import ContentType
from django.contrib import messages
from django.http import JsonResponse, HttpResponseRedirect, HttpResponse, HttpRequest
import requests
from django.core.exceptions import ObjectDoesNotExist
from django.http import HttpResponseRedirect, HttpResponse, HttpRequest, JsonResponse
from .models import Park, ParkArea
from .forms import ParkForm
from .location_utils import normalize_coordinate, normalize_osm_result
from .filters import ParkFilter
from core.views import SlugRedirectMixin
from moderation.mixins import EditSubmissionMixin, PhotoSubmissionMixin, HistoryMixin
from moderation.models import EditSubmission
from media.models import Photo
from location.models import Location
from reviews.models import Review
from analytics.models import PageView
from search.mixins import HTMXFilterableMixin
ViewMode = Literal["grid", "list"]
def get_view_mode(request: HttpRequest) -> ViewMode:
"""Get the current view mode from request, defaulting to grid"""
view_mode = request.GET.get('view_mode', 'grid')
return cast(ViewMode, 'list' if view_mode == 'list' else 'grid')
def get_base_park_queryset() -> QuerySet[Park]:
"""Get base queryset with all needed annotations and prefetches"""
return (
Park.objects.select_related('owner')
.prefetch_related('location', 'photos', 'rides')
.annotate(
current_ride_count=Count('rides', distinct=True),
current_coaster_count=Count('rides', filter=Q(rides__category="RC"), distinct=True)
)
.order_by('name')
)
def add_park_button(request: HttpRequest) -> HttpResponse:
"""Return the add park button partial template"""
return render(request, "parks/partials/add_park_button.html")
def park_actions(request: HttpRequest, slug: str) -> HttpResponse:
"""Return the park actions partial template"""
park = get_object_or_404(Park, slug=slug)
return render(request, "parks/partials/park_actions.html", {"park": park})
def get_park_areas(request: HttpRequest) -> HttpResponse:
"""Return park areas as options for a select element"""
park_id = request.GET.get('park')
@@ -49,53 +68,12 @@ def get_park_areas(request: HttpRequest) -> HttpResponse:
except Park.DoesNotExist:
return HttpResponse('<option value="">Invalid park selected</option>')
def search_parks(request: HttpRequest) -> HttpResponse:
"""Search parks and return results for quick searches with auto-suggestions"""
try:
search_query = request.GET.get('search', '').strip()
if not search_query:
return HttpResponse('') # Keep empty string for clearing search results
queryset = (
Park.objects.select_related('owner')
.prefetch_related('location', 'photos')
.annotate(
ride_count=Count('rides'),
coaster_count=Count('rides', filter=Q(rides__category="RC"))
)
.order_by('name')
)
# Use our existing filter but with search-specific configuration
park_filter = ParkFilter({
'search': search_query
}, queryset=queryset)
parks = park_filter.qs[:8] # Limit to 8 suggestions
response = render(request, "parks/park_list.html", {
"parks": parks
})
response['HX-Trigger'] = 'searchComplete'
return response
except Exception as e:
response = render(request, "parks/park_list.html", {
"parks": [],
"error": f"Error performing search: {str(e)}"
})
response['HX-Trigger'] = 'searchError'
return response
def location_search(request: HttpRequest) -> JsonResponse:
"""Search for locations using OpenStreetMap Nominatim API"""
query = request.GET.get("q", "")
if not query:
return JsonResponse({"results": []})
# Call Nominatim API
response = requests.get(
"https://nominatim.openstreetmap.org/search",
params={
@@ -107,21 +85,20 @@ def location_search(request: HttpRequest) -> JsonResponse:
"limit": 10,
},
headers={"User-Agent": "ThrillWiki/1.0"},
timeout=60)
timeout=60
)
if response.status_code == 200:
results = response.json()
normalized_results = [normalize_osm_result(result) for result in results]
valid_results = [
r
for r in normalized_results
r for r in normalized_results
if r["lat"] is not None and r["lon"] is not None
]
return JsonResponse({"results": valid_results})
return JsonResponse({"results": []})
def reverse_geocode(request: HttpRequest) -> JsonResponse:
"""Reverse geocode coordinates using OpenStreetMap Nominatim API"""
try:
@@ -137,13 +114,9 @@ def reverse_geocode(request: HttpRequest) -> JsonResponse:
lon = lon.quantize(Decimal("0.000001"), rounding=ROUND_DOWN)
if lat < -90 or lat > 90:
return JsonResponse(
{"error": "Latitude must be between -90 and 90"}, status=400
)
return JsonResponse({"error": "Latitude must be between -90 and 90"}, status=400)
if lon < -180 or lon > 180:
return JsonResponse(
{"error": "Longitude must be between -180 and 180"}, status=400
)
return JsonResponse({"error": "Longitude must be between -180 and 180"}, status=400)
response = requests.get(
"https://nominatim.openstreetmap.org/reverse",
@@ -156,7 +129,8 @@ def reverse_geocode(request: HttpRequest) -> JsonResponse:
"accept-language": "en",
},
headers={"User-Agent": "ThrillWiki/1.0"},
timeout=60)
timeout=60
)
if response.status_code == 200:
result = response.json()
@@ -168,111 +142,109 @@ def reverse_geocode(request: HttpRequest) -> JsonResponse:
return JsonResponse({"error": "Geocoding failed"}, status=500)
def add_park_button(request: HttpRequest) -> HttpResponse:
"""Return the add park button partial template"""
return render(request, "parks/partials/add_park_button.html")
class ParkListView(HTMXFilterableMixin, ListView):
model = Park
template_name = "parks/park_list.html"
context_object_name = "parks"
filter_class = ParkFilter
paginate_by = 20
def get_template_names(self) -> list[str]:
"""Override to use same template for HTMX and regular requests"""
"""Return park_list_item.html for HTMX requests"""
if self.request.htmx:
return ["parks/partials/park_list_item.html"]
return [self.template_name]
def get_view_mode(self) -> ViewMode:
"""Get the current view mode (grid or list)"""
return get_view_mode(self.request)
def get_queryset(self) -> QuerySet[Park]:
"""Get base queryset with annotations and apply filters"""
try:
return (
super()
.get_queryset()
.select_related("owner")
.prefetch_related(
"photos",
"location",
"rides",
"rides__manufacturer",
"areas"
)
.annotate(
total_rides=Count("rides"),
total_coasters=Count("rides", filter=Q(rides__category="RC")),
)
.order_by("name") # Ensure consistent ordering for pagination
)
queryset = get_base_park_queryset()
except Exception as e:
messages.error(self.request, f"Error loading parks: {str(e)}")
return Park.objects.none()
queryset = self.model.objects.none()
# Always initialize filterset, even if queryset failed
self.filterset = self.get_filter_class()(self.request.GET, queryset=queryset)
return self.filterset.qs
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
"""Add view_mode and other context data"""
try:
# Initialize filterset even if queryset fails
if not hasattr(self, 'filterset'):
self.filterset = self.get_filter_class()(
self.request.GET,
queryset=self.model.objects.none()
)
context = super().get_context_data(**kwargs)
context['results_template'] = "parks/partials/park_list_item.html"
context.update({
'view_mode': self.get_view_mode(),
'is_search': bool(self.request.GET.get('search')),
'search_query': self.request.GET.get('search', '')
})
return context
except Exception as e:
messages.error(self.request, f"Error applying filters: {str(e)}")
# Ensure filterset exists in error case
if not hasattr(self, 'filterset'):
self.filterset = self.get_filter_class()(
self.request.GET,
queryset=self.model.objects.none()
)
return {
"filter": self.filterset,
"error": "Unable to apply filters. Please try adjusting your criteria.",
"results_template": "parks/partials/park_list_item.html"
'filter': self.filterset,
'error': "Unable to apply filters. Please try adjusting your criteria.",
'view_mode': self.get_view_mode(),
'is_search': bool(self.request.GET.get('search')),
'search_query': self.request.GET.get('search', '')
}
class ParkDetailView(
SlugRedirectMixin,
EditSubmissionMixin,
PhotoSubmissionMixin,
HistoryMixin,
DetailView,
):
model = Park
template_name = "parks/park_detail.html"
context_object_name = "park"
def search_parks(request: HttpRequest) -> HttpResponse:
"""Search parks and return results using park_list_item.html"""
try:
search_query = request.GET.get('search', '').strip()
if not search_query:
return HttpResponse('')
def get_object(self, queryset: Optional[QuerySet[Park]] = None) -> Park:
if queryset is None:
queryset = self.get_queryset()
slug = self.kwargs.get(self.slug_url_kwarg)
if slug is None:
raise ObjectDoesNotExist("No slug provided")
park, _ = Park.get_by_slug(slug)
return park
park_filter = ParkFilter({
'search': search_query
}, queryset=get_base_park_queryset())
def get_queryset(self) -> QuerySet[Park]:
return cast(
QuerySet[Park],
super()
.get_queryset()
.prefetch_related(
"rides", "rides__manufacturer", "photos", "areas", "location"
),
parks = park_filter.qs
if request.GET.get('quick_search'):
parks = parks[:8] # Limit quick search results
response = render(
request,
"parks/partials/park_list_item.html",
{
"parks": parks,
"view_mode": get_view_mode(request),
"search_query": search_query,
"is_search": True
}
)
response['HX-Trigger'] = 'searchComplete'
return response
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
park = cast(Park, self.object)
context["areas"] = park.areas.all()
context["rides"] = park.rides.all().order_by("-status", "name")
if self.request.user.is_authenticated:
context["has_reviewed"] = Review.objects.filter(
user=self.request.user,
content_type=ContentType.objects.get_for_model(Park),
object_id=park.id,
).exists()
else:
context["has_reviewed"] = False
return context
def get_redirect_url_pattern(self) -> str:
return "parks:park_detail"
except Exception as e:
response = render(
request,
"parks/partials/park_list_item.html",
{
"parks": [],
"error": f"Error performing search: {str(e)}",
"is_search": True
}
)
response['HX-Trigger'] = 'searchError'
return response
class ParkCreateView(LoginRequiredMixin, CreateView):
model = Park
@@ -346,6 +318,7 @@ class ParkCreateView(LoginRequiredMixin, CreateView):
)
photos = self.request.FILES.getlist("photos")
uploaded_count = 0
for photo_file in photos:
try:
Photo.objects.create(
@@ -354,6 +327,7 @@ class ParkCreateView(LoginRequiredMixin, CreateView):
content_type=ContentType.objects.get_for_model(Park),
object_id=self.object.id,
)
uploaded_count += 1
except Exception as e:
messages.error(
self.request,
@@ -363,7 +337,7 @@ class ParkCreateView(LoginRequiredMixin, CreateView):
messages.success(
self.request,
f"Successfully created {self.object.name}. "
f"Added {len(photos)} photo(s).",
f"Added {uploaded_count} photo(s).",
)
return HttpResponseRedirect(self.get_success_url())
except Exception as e:
@@ -530,6 +504,327 @@ class ParkUpdateView(LoginRequiredMixin, UpdateView):
return reverse("parks:park_detail", kwargs={"slug": self.object.slug})
class ParkDetailView(
SlugRedirectMixin,
EditSubmissionMixin,
PhotoSubmissionMixin,
HistoryMixin,
DetailView
):
model = Park
template_name = "parks/park_detail.html"
context_object_name = "park"
def get_object(self, queryset: Optional[QuerySet[Park]] = None) -> Park:
if queryset is None:
queryset = self.get_queryset()
slug = self.kwargs.get(self.slug_url_kwarg)
if slug is None:
raise ObjectDoesNotExist("No slug provided")
park, _ = Park.get_by_slug(slug)
return park
def get_queryset(self) -> QuerySet[Park]:
return cast(
QuerySet[Park],
super()
.get_queryset()
.prefetch_related(
"rides",
"rides__manufacturer",
"photos",
"areas",
"location"
),
)
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
park = cast(Park, self.object)
context["areas"] = park.areas.all()
context["rides"] = park.rides.all().order_by("-status", "name")
if self.request.user.is_authenticated:
context["has_reviewed"] = Review.objects.filter(
user=self.request.user,
content_type=ContentType.objects.get_for_model(Park),
object_id=park.id,
).exists()
else:
context["has_reviewed"] = False
return context
def get_redirect_url_pattern(self) -> str:
return "parks:park_detail"
class ParkAreaDetailView(
SlugRedirectMixin,
EditSubmissionMixin,
PhotoSubmissionMixin,
HistoryMixin,
DetailView
):
model = ParkArea
template_name = "parks/area_detail.html"
context_object_name = "area"
slug_url_kwarg = "area_slug"
def get_object(self, queryset: Optional[QuerySet[ParkArea]] = None) -> ParkArea:
if queryset is None:
queryset = self.get_queryset()
park_slug = self.kwargs.get("park_slug")
area_slug = self.kwargs.get("area_slug")
if park_slug is None or area_slug is None:
raise ObjectDoesNotExist("Missing slug")
area, _ = ParkArea.get_by_slug(area_slug)
if area.park.slug != park_slug:
raise ObjectDoesNotExist("Park slug doesn't match")
return area
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
return context
def get_redirect_url_pattern(self) -> str:
return "parks:park_detail"
def get_redirect_url_kwargs(self) -> dict[str, str]:
area = cast(ParkArea, self.object)
return {"park_slug": area.park.slug, "area_slug": area.slug}
def form_valid(self, form: ParkForm) -> HttpResponse:
self.normalize_coordinates(form)
changes = self.prepare_changes_data(form.cleaned_data)
submission = EditSubmission.objects.create(
user=self.request.user,
content_type=ContentType.objects.get_for_model(Park),
submission_type="CREATE",
changes=changes,
reason=self.request.POST.get("reason", ""),
source=self.request.POST.get("source", ""),
)
if hasattr(self.request.user, "role") and getattr(
self.request.user, "role", None
) in ["MODERATOR", "ADMIN", "SUPERUSER"]:
try:
self.object = form.save()
submission.object_id = self.object.id
submission.status = "APPROVED"
submission.handled_by = self.request.user
submission.save()
if form.cleaned_data.get("latitude") and form.cleaned_data.get(
"longitude"
):
Location.objects.create(
content_type=ContentType.objects.get_for_model(Park),
object_id=self.object.id,
name=self.object.name,
location_type="park",
latitude=form.cleaned_data["latitude"],
longitude=form.cleaned_data["longitude"],
street_address=form.cleaned_data.get(
"street_address", ""),
city=form.cleaned_data.get("city", ""),
state=form.cleaned_data.get("state", ""),
country=form.cleaned_data.get("country", ""),
postal_code=form.cleaned_data.get("postal_code", ""),
)
photos = self.request.FILES.getlist("photos")
uploaded_count = 0
for photo_file in photos:
try:
Photo.objects.create(
image=photo_file,
uploaded_by=self.request.user,
content_type=ContentType.objects.get_for_model(
Park),
object_id=self.object.id,
)
uploaded_count += 1
except Exception as e:
messages.error(
self.request,
f"Error uploading photo {photo_file.name}: {str(e)}",
)
messages.success(
self.request,
f"Successfully created {self.object.name}. "
f"Added {uploaded_count} photo(s).",
)
return HttpResponseRedirect(self.get_success_url())
except Exception as e:
messages.error(
self.request,
f"Error creating park: {str(e)}. Please check your input and try again.",
)
return self.form_invalid(form)
messages.success(
self.request,
"Your park submission has been sent for review. "
"You will be notified when it is approved.",
)
return HttpResponseRedirect(reverse("parks:park_list"))
def form_invalid(self, form: ParkForm) -> HttpResponse:
messages.error(
self.request,
"Please correct the errors below. Required fields are marked with an asterisk (*).",
)
for field, errors in form.errors.items():
for error in errors:
messages.error(self.request, f"{field}: {error}")
return super().form_invalid(form)
def get_success_url(self) -> str:
return reverse("parks:park_detail", kwargs={"slug": self.object.slug})
class ParkUpdateView(LoginRequiredMixin, UpdateView):
model = Park
form_class = ParkForm
template_name = "parks/park_form.html"
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
context = super().get_context_data(**kwargs)
context["is_edit"] = True
return context
def prepare_changes_data(self, cleaned_data: dict[str, Any]) -> dict[str, Any]:
data = cleaned_data.copy()
if data.get("owner"):
data["owner"] = data["owner"].id
if data.get("opening_date"):
data["opening_date"] = data["opening_date"].isoformat()
if data.get("closing_date"):
data["closing_date"] = data["closing_date"].isoformat()
decimal_fields = ["latitude", "longitude",
"size_acres", "average_rating"]
for field in decimal_fields:
if data.get(field):
data[field] = str(data[field])
return data
def normalize_coordinates(self, form: ParkForm) -> None:
if form.cleaned_data.get("latitude"):
lat = Decimal(str(form.cleaned_data["latitude"]))
form.cleaned_data["latitude"] = lat.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
if form.cleaned_data.get("longitude"):
lon = Decimal(str(form.cleaned_data["longitude"]))
form.cleaned_data["longitude"] = lon.quantize(
Decimal("0.000001"), rounding=ROUND_DOWN
)
def form_valid(self, form: ParkForm) -> HttpResponse:
self.normalize_coordinates(form)
changes = self.prepare_changes_data(form.cleaned_data)
submission = EditSubmission.objects.create(
user=self.request.user,
content_type=ContentType.objects.get_for_model(Park),
object_id=self.object.id,
submission_type="EDIT",
changes=changes,
reason=self.request.POST.get("reason", ""),
source=self.request.POST.get("source", ""),
)
if hasattr(self.request.user, "role") and getattr(
self.request.user, "role", None
) in ["MODERATOR", "ADMIN", "SUPERUSER"]:
try:
self.object = form.save()
submission.status = "APPROVED"
submission.handled_by = self.request.user
submission.save()
location_data = {
"name": self.object.name,
"location_type": "park",
"latitude": form.cleaned_data.get("latitude"),
"longitude": form.cleaned_data.get("longitude"),
"street_address": form.cleaned_data.get("street_address", ""),
"city": form.cleaned_data.get("city", ""),
"state": form.cleaned_data.get("state", ""),
"country": form.cleaned_data.get("country", ""),
"postal_code": form.cleaned_data.get("postal_code", ""),
}
if self.object.location.exists():
location = self.object.location.first()
for key, value in location_data.items():
setattr(location, key, value)
location.save()
else:
Location.objects.create(
content_type=ContentType.objects.get_for_model(Park),
object_id=self.object.id,
**location_data,
)
photos = self.request.FILES.getlist("photos")
uploaded_count = 0
for photo_file in photos:
try:
Photo.objects.create(
image=photo_file,
uploaded_by=self.request.user,
content_type=ContentType.objects.get_for_model(
Park),
object_id=self.object.id,
)
uploaded_count += 1
except Exception as e:
messages.error(
self.request,
f"Error uploading photo {photo_file.name}: {str(e)}",
)
messages.success(
self.request,
f"Successfully updated {self.object.name}. "
f"Added {uploaded_count} new photo(s).",
)
return HttpResponseRedirect(self.get_success_url())
except Exception as e:
messages.error(
self.request,
f"Error updating park: {str(e)}. Please check your input and try again.",
)
return self.form_invalid(form)
messages.success(
self.request,
f"Your changes to {self.object.name} have been sent for review. "
"You will be notified when they are approved.",
)
return HttpResponseRedirect(
reverse("parks:park_detail", kwargs={"slug": self.object.slug})
)
def form_invalid(self, form: ParkForm) -> HttpResponse:
messages.error(
self.request,
"Please correct the errors below. Required fields are marked with an asterisk (*).",
)
for field, errors in form.errors.items():
for error in errors:
messages.error(self.request, f"{field}: {error}")
return super().form_invalid(form)
def get_success_url(self) -> str:
return reverse("parks:park_detail", kwargs={"slug": self.object.slug})
class ParkAreaDetailView(
SlugRedirectMixin,
EditSubmissionMixin,