mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-20 16:11:08 -05:00
367 lines
14 KiB
Python
367 lines
14 KiB
Python
from typing import Any, Optional, Tuple, Type, cast, Union, Dict, Callable
|
|
from django.views.generic import DetailView, ListView, CreateView, UpdateView
|
|
from django.shortcuts import get_object_or_404
|
|
from django.urls import reverse
|
|
from django.contrib.auth.mixins import LoginRequiredMixin
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib import messages
|
|
from django.http import HttpResponseRedirect, Http404, JsonResponse, HttpResponse
|
|
from django.db.models import Count, Sum, Q, QuerySet, Model
|
|
from django.contrib.auth import get_user_model
|
|
from .models import Company, Manufacturer
|
|
from .forms import CompanyForm, ManufacturerForm
|
|
from rides.models import Ride
|
|
from parks.models import Park
|
|
from location.models import Location
|
|
from core.views import SlugRedirectMixin
|
|
from moderation.mixins import EditSubmissionMixin, PhotoSubmissionMixin, HistoryMixin
|
|
from moderation.models import EditSubmission
|
|
|
|
User = get_user_model()
|
|
|
|
ModelType = Union[Type[Company], Type[Manufacturer]]
|
|
|
|
def get_company_parks(company: Company) -> QuerySet[Park]:
|
|
"""Get parks owned by a company with related data."""
|
|
return Park.objects.filter(
|
|
owner=company
|
|
).select_related('owner')
|
|
|
|
def get_company_ride_count(parks: QuerySet[Park]) -> int:
|
|
"""Get total number of rides across all parks."""
|
|
return Ride.objects.filter(park__in=parks).count()
|
|
|
|
def get_manufacturer_rides(manufacturer: Manufacturer) -> QuerySet[Ride]:
|
|
"""Get rides made by a manufacturer with related data."""
|
|
return Ride.objects.filter(
|
|
manufacturer=manufacturer
|
|
).select_related('park', 'coaster_stats')
|
|
|
|
def get_manufacturer_stats(rides: QuerySet[Ride]) -> Dict[str, int]:
|
|
"""Get statistics for manufacturer rides."""
|
|
return {
|
|
'coaster_count': rides.filter(category='ROLLER_COASTER').count(),
|
|
'parks_count': rides.values('park').distinct().count()
|
|
}
|
|
|
|
def handle_submission_post(
|
|
request: Any,
|
|
handle_photo_submission: Callable[[Any], HttpResponse],
|
|
super_post: Callable[..., HttpResponse],
|
|
*args: Any,
|
|
**kwargs: Any
|
|
) -> HttpResponse:
|
|
"""Handle POST requests for photos and edits."""
|
|
if request.FILES:
|
|
# Handle photo submission
|
|
return handle_photo_submission(request)
|
|
# Handle edit submission
|
|
return super_post(request, *args, **kwargs)
|
|
|
|
# List Views
|
|
class CompanyListView(ListView):
|
|
model: Type[Company] = Company
|
|
template_name = "companies/company_list.html"
|
|
context_object_name = "companies"
|
|
paginate_by = 12
|
|
|
|
def get_queryset(self) -> QuerySet[Company]:
|
|
queryset = self.model.objects.all()
|
|
|
|
if country := self.request.GET.get("country"):
|
|
# Get companies that have locations in the specified country
|
|
company_ids = Location.objects.filter(
|
|
content_type=ContentType.objects.get_for_model(Company),
|
|
country__iexact=country,
|
|
).values_list("object_id", flat=True)
|
|
queryset = queryset.filter(pk__in=company_ids)
|
|
|
|
if search := self.request.GET.get("search"):
|
|
queryset = queryset.filter(name__icontains=search)
|
|
|
|
return queryset.order_by("name")
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
# Add filter values to context
|
|
context["country"] = self.request.GET.get("country", "")
|
|
context["search"] = self.request.GET.get("search", "")
|
|
return context
|
|
|
|
|
|
class ManufacturerListView(ListView):
|
|
model: Type[Manufacturer] = Manufacturer
|
|
template_name = "companies/manufacturer_list.html"
|
|
context_object_name = "manufacturers"
|
|
paginate_by = 12
|
|
|
|
def get_queryset(self) -> QuerySet[Manufacturer]:
|
|
queryset = self.model.objects.all()
|
|
|
|
if country := self.request.GET.get("country"):
|
|
# Get manufacturers that have locations in the specified country
|
|
manufacturer_ids = Location.objects.filter(
|
|
content_type=ContentType.objects.get_for_model(Manufacturer),
|
|
country__iexact=country,
|
|
).values_list("object_id", flat=True)
|
|
queryset = queryset.filter(pk__in=manufacturer_ids)
|
|
|
|
if search := self.request.GET.get("search"):
|
|
queryset = queryset.filter(name__icontains=search)
|
|
|
|
return queryset.order_by("name")
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
# Add stats for filtering
|
|
context["total_manufacturers"] = self.model.objects.count()
|
|
context["total_rides"] = Ride.objects.filter(manufacturer__isnull=False).count()
|
|
context["total_roller_coasters"] = Ride.objects.filter(
|
|
manufacturer__isnull=False, category="ROLLER_COASTER"
|
|
).count()
|
|
# Add filter values to context
|
|
context["country"] = self.request.GET.get("country", "")
|
|
context["search"] = self.request.GET.get("search", "")
|
|
return context
|
|
|
|
|
|
# Detail Views
|
|
class CompanyDetailView(SlugRedirectMixin, EditSubmissionMixin, PhotoSubmissionMixin, HistoryMixin, DetailView):
|
|
model: Type[Company] = Company
|
|
template_name = 'companies/company_detail.html'
|
|
context_object_name = 'company'
|
|
|
|
def get_object(self, queryset: Optional[QuerySet[Company]] = None) -> Company:
|
|
if queryset is None:
|
|
queryset = self.get_queryset()
|
|
slug = self.kwargs.get(self.slug_url_kwarg)
|
|
try:
|
|
# Try to get by current or historical slug
|
|
model = cast(Type[Company], self.model)
|
|
obj, _ = model.get_by_slug(slug)
|
|
return obj
|
|
except model.DoesNotExist as e:
|
|
raise Http404(f"No {model._meta.verbose_name} found matching the query") from e
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
company = cast(Company, self.object)
|
|
|
|
parks = get_company_parks(company)
|
|
context['parks'] = parks
|
|
context['total_rides'] = get_company_ride_count(parks)
|
|
return context
|
|
|
|
def get_redirect_url_pattern(self) -> str:
|
|
return 'companies:company_detail'
|
|
|
|
def post(self, request: Any, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
"""Handle POST requests for photos and edits."""
|
|
return handle_submission_post(
|
|
request,
|
|
self.handle_photo_submission,
|
|
super().post,
|
|
*args,
|
|
**kwargs
|
|
)
|
|
|
|
class ManufacturerDetailView(SlugRedirectMixin, EditSubmissionMixin, PhotoSubmissionMixin, HistoryMixin, DetailView):
|
|
model: Type[Manufacturer] = Manufacturer
|
|
template_name = 'companies/manufacturer_detail.html'
|
|
context_object_name = 'manufacturer'
|
|
|
|
def get_object(self, queryset: Optional[QuerySet[Manufacturer]] = None) -> Manufacturer:
|
|
if queryset is None:
|
|
queryset = self.get_queryset()
|
|
slug = self.kwargs.get(self.slug_url_kwarg)
|
|
try:
|
|
# Try to get by current or historical slug
|
|
model = cast(Type[Manufacturer], self.model)
|
|
obj, _ = model.get_by_slug(slug)
|
|
return obj
|
|
except model.DoesNotExist as e:
|
|
raise Http404(f"No {model._meta.verbose_name} found matching the query") from e
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
manufacturer = cast(Manufacturer, self.object)
|
|
|
|
rides = get_manufacturer_rides(manufacturer)
|
|
context['rides'] = rides
|
|
context.update(get_manufacturer_stats(rides))
|
|
return context
|
|
|
|
def get_redirect_url_pattern(self) -> str:
|
|
return 'companies:manufacturer_detail'
|
|
|
|
def post(self, request: Any, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
"""Handle POST requests for photos and edits."""
|
|
return handle_submission_post(
|
|
request,
|
|
self.handle_photo_submission,
|
|
super().post,
|
|
*args,
|
|
**kwargs
|
|
)
|
|
|
|
|
|
def _handle_submission(
|
|
request: Any, form: Any, model: ModelType, success_url: str = ""
|
|
) -> HttpResponseRedirect:
|
|
"""Helper method to handle form submissions"""
|
|
cleaned_data = form.cleaned_data.copy()
|
|
submission = EditSubmission.objects.create(
|
|
user=request.user,
|
|
content_type=ContentType.objects.get_for_model(model),
|
|
submission_type="CREATE",
|
|
status="NEW",
|
|
changes=cleaned_data,
|
|
reason=request.POST.get("reason", ""),
|
|
source=request.POST.get("source", ""),
|
|
)
|
|
|
|
# Get user role safely
|
|
user_role = getattr(request.user, "role", None)
|
|
|
|
# If user is moderator or above, auto-approve
|
|
if user_role in ["MODERATOR", "ADMIN", "SUPERUSER"]:
|
|
obj = form.save()
|
|
submission.object_id = obj.pk
|
|
submission.status = "APPROVED"
|
|
submission.handled_by = request.user
|
|
submission.save()
|
|
|
|
# Generate success URL if not provided
|
|
if not success_url:
|
|
success_url = reverse(
|
|
f"companies:{model.__name__.lower()}_detail", kwargs={"slug": obj.slug}
|
|
)
|
|
messages.success(request, f'Successfully created {getattr(obj, "name", "")}')
|
|
return HttpResponseRedirect(success_url)
|
|
|
|
messages.success(request, "Your submission has been sent for review")
|
|
return HttpResponseRedirect(reverse(f"companies:{model.__name__.lower()}_list"))
|
|
|
|
|
|
# Create Views
|
|
class CompanyCreateView(LoginRequiredMixin, CreateView):
|
|
model: Type[Company] = Company
|
|
form_class = CompanyForm
|
|
template_name = "companies/company_form.html"
|
|
object: Optional[Company]
|
|
|
|
def form_valid(self, form: CompanyForm) -> HttpResponseRedirect:
|
|
return _handle_submission(self.request, form, self.model, "")
|
|
|
|
def get_success_url(self) -> str:
|
|
if self.object is None:
|
|
return reverse("companies:company_list")
|
|
return reverse("companies:company_detail", kwargs={"slug": self.object.slug})
|
|
|
|
|
|
class ManufacturerCreateView(LoginRequiredMixin, CreateView):
|
|
model: Type[Manufacturer] = Manufacturer
|
|
form_class = ManufacturerForm
|
|
template_name = "companies/manufacturer_form.html"
|
|
object: Optional[Manufacturer]
|
|
|
|
def form_valid(self, form: ManufacturerForm) -> HttpResponseRedirect:
|
|
return _handle_submission(self.request, form, self.model, "")
|
|
|
|
def get_success_url(self) -> str:
|
|
if self.object is None:
|
|
return reverse("companies:manufacturer_list")
|
|
return reverse(
|
|
"companies:manufacturer_detail", kwargs={"slug": self.object.slug}
|
|
)
|
|
|
|
|
|
def _handle_update(
|
|
request: Any, form: Any, obj: Union[Company, Manufacturer], model: ModelType
|
|
) -> HttpResponseRedirect:
|
|
"""Helper method to handle update submissions"""
|
|
cleaned_data = form.cleaned_data.copy()
|
|
submission = EditSubmission.objects.create(
|
|
user=request.user,
|
|
content_type=ContentType.objects.get_for_model(model),
|
|
object_id=obj.pk,
|
|
submission_type="EDIT",
|
|
changes=cleaned_data,
|
|
reason=request.POST.get("reason", ""),
|
|
source=request.POST.get("source", ""),
|
|
)
|
|
|
|
# Get user role safely
|
|
user_role = getattr(request.user, "role", None)
|
|
|
|
# If user is moderator or above, auto-approve
|
|
if user_role in ["MODERATOR", "ADMIN", "SUPERUSER"]:
|
|
obj = form.save()
|
|
submission.status = "APPROVED"
|
|
submission.handled_by = request.user
|
|
submission.save()
|
|
messages.success(request, f'Successfully updated {getattr(obj, "name", "")}')
|
|
return HttpResponseRedirect(
|
|
reverse(
|
|
f"companies:{model.__name__.lower()}_detail",
|
|
kwargs={"slug": getattr(obj, "slug", "")},
|
|
)
|
|
)
|
|
|
|
messages.success(
|
|
request, f'Your changes to {getattr(obj, "name", "")} have been sent for review'
|
|
)
|
|
return HttpResponseRedirect(
|
|
reverse(
|
|
f"companies:{model.__name__.lower()}_detail",
|
|
kwargs={"slug": getattr(obj, "slug", "")},
|
|
)
|
|
)
|
|
|
|
|
|
# Update Views
|
|
class CompanyUpdateView(LoginRequiredMixin, UpdateView):
|
|
model: Type[Company] = Company
|
|
form_class = CompanyForm
|
|
template_name = "companies/company_form.html"
|
|
object: Optional[Company]
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
context["is_edit"] = True
|
|
return context
|
|
|
|
def form_valid(self, form: CompanyForm) -> HttpResponseRedirect:
|
|
if self.object is None:
|
|
return HttpResponseRedirect(reverse("companies:company_list"))
|
|
return _handle_update(self.request, form, self.object, self.model)
|
|
|
|
def get_success_url(self) -> str:
|
|
if self.object is None:
|
|
return reverse("companies:company_list")
|
|
return reverse("companies:company_detail", kwargs={"slug": self.object.slug})
|
|
|
|
|
|
class ManufacturerUpdateView(LoginRequiredMixin, UpdateView):
|
|
model: Type[Manufacturer] = Manufacturer
|
|
form_class = ManufacturerForm
|
|
template_name = "companies/manufacturer_form.html"
|
|
object: Optional[Manufacturer]
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
context["is_edit"] = True
|
|
return context
|
|
|
|
def form_valid(self, form: ManufacturerForm) -> HttpResponseRedirect:
|
|
if self.object is None:
|
|
return HttpResponseRedirect(reverse("companies:manufacturer_list"))
|
|
return _handle_update(self.request, form, self.object, self.model)
|
|
|
|
def get_success_url(self) -> str:
|
|
if self.object is None:
|
|
return reverse("companies:manufacturer_list")
|
|
return reverse(
|
|
"companies:manufacturer_detail", kwargs={"slug": self.object.slug}
|
|
)
|