fixed a bunch of things, hopefully didn't break things
@@ -1,83 +1,90 @@
|
||||
from django.db import models
|
||||
from django.contrib.contenttypes.fields import GenericRelation
|
||||
from django.utils.text import slugify
|
||||
from simple_history.models import HistoricalRecords
|
||||
from django.urls import reverse
|
||||
from typing import Tuple, Optional, ClassVar, TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from history_tracking.models import HistoricalSlug
|
||||
|
||||
class Company(models.Model):
|
||||
name = models.CharField(max_length=255)
|
||||
slug = models.SlugField(max_length=255, unique=True)
|
||||
website = models.URLField(blank=True)
|
||||
headquarters = models.CharField(max_length=255, blank=True)
|
||||
description = models.TextField(blank=True)
|
||||
website = models.URLField(blank=True)
|
||||
founded_date = models.DateField(null=True, blank=True)
|
||||
total_parks = models.IntegerField(default=0)
|
||||
total_rides = models.IntegerField(default=0)
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
updated_at = models.DateTimeField(auto_now=True)
|
||||
photos = GenericRelation('media.Photo')
|
||||
history = HistoricalRecords()
|
||||
|
||||
# Stats fields
|
||||
total_parks = models.PositiveIntegerField(default=0)
|
||||
total_rides = models.PositiveIntegerField(default=0)
|
||||
objects: ClassVar[models.Manager['Company']]
|
||||
|
||||
class Meta:
|
||||
verbose_name_plural = "companies"
|
||||
verbose_name_plural = 'companies'
|
||||
ordering = ['name']
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
def save(self, *args, **kwargs) -> None:
|
||||
if not self.slug:
|
||||
self.slug = slugify(self.name)
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def get_by_slug(cls, slug):
|
||||
"""Get company by current or historical slug"""
|
||||
def get_by_slug(cls, slug: str) -> Tuple['Company', bool]:
|
||||
"""Get company by slug, checking historical slugs if needed"""
|
||||
try:
|
||||
return cls.objects.get(slug=slug), False
|
||||
except cls.DoesNotExist:
|
||||
# Check historical slugs
|
||||
history = cls.history.filter(slug=slug).order_by('-history_date').first()
|
||||
if history:
|
||||
return cls.objects.get(id=history.id), True
|
||||
raise cls.DoesNotExist("No company found with this slug")
|
||||
from history_tracking.models import HistoricalSlug
|
||||
try:
|
||||
historical = HistoricalSlug.objects.get(
|
||||
content_type__model='company',
|
||||
slug=slug
|
||||
)
|
||||
return cls.objects.get(pk=historical.object_id), True
|
||||
except (HistoricalSlug.DoesNotExist, cls.DoesNotExist):
|
||||
raise cls.DoesNotExist()
|
||||
|
||||
class Manufacturer(models.Model):
|
||||
name = models.CharField(max_length=255)
|
||||
slug = models.SlugField(max_length=255, unique=True)
|
||||
website = models.URLField(blank=True)
|
||||
headquarters = models.CharField(max_length=255, blank=True)
|
||||
description = models.TextField(blank=True)
|
||||
website = models.URLField(blank=True)
|
||||
founded_date = models.DateField(null=True, blank=True)
|
||||
total_rides = models.IntegerField(default=0)
|
||||
total_roller_coasters = models.IntegerField(default=0)
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
updated_at = models.DateTimeField(auto_now=True)
|
||||
photos = GenericRelation('media.Photo')
|
||||
history = HistoricalRecords()
|
||||
|
||||
# Stats fields
|
||||
total_rides = models.PositiveIntegerField(default=0)
|
||||
total_roller_coasters = models.PositiveIntegerField(default=0)
|
||||
objects: ClassVar[models.Manager['Manufacturer']]
|
||||
|
||||
class Meta:
|
||||
ordering = ['name']
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
def save(self, *args, **kwargs) -> None:
|
||||
if not self.slug:
|
||||
self.slug = slugify(self.name)
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def get_by_slug(cls, slug):
|
||||
"""Get manufacturer by current or historical slug"""
|
||||
def get_by_slug(cls, slug: str) -> Tuple['Manufacturer', bool]:
|
||||
"""Get manufacturer by slug, checking historical slugs if needed"""
|
||||
try:
|
||||
return cls.objects.get(slug=slug), False
|
||||
except cls.DoesNotExist:
|
||||
# Check historical slugs
|
||||
history = cls.history.filter(slug=slug).order_by('-history_date').first()
|
||||
if history:
|
||||
return cls.objects.get(id=history.id), True
|
||||
raise cls.DoesNotExist("No manufacturer found with this slug")
|
||||
from history_tracking.models import HistoricalSlug
|
||||
try:
|
||||
historical = HistoricalSlug.objects.get(
|
||||
content_type__model='manufacturer',
|
||||
slug=slug
|
||||
)
|
||||
return cls.objects.get(pk=historical.object_id), True
|
||||
except (HistoricalSlug.DoesNotExist, cls.DoesNotExist):
|
||||
raise cls.DoesNotExist()
|
||||
|
||||
@@ -5,6 +5,8 @@ from django.contrib.contenttypes.models import ContentType
|
||||
from django.contrib.gis.geos import Point
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||
from django.http import HttpResponse
|
||||
from typing import cast, Tuple, Optional
|
||||
from .models import Company, Manufacturer
|
||||
from location.models import Location
|
||||
from moderation.models import EditSubmission, PhotoSubmission
|
||||
@@ -13,7 +15,7 @@ from media.models import Photo
|
||||
User = get_user_model()
|
||||
|
||||
class CompanyModelTests(TestCase):
|
||||
def setUp(self):
|
||||
def setUp(self) -> None:
|
||||
self.company = Company.objects.create(
|
||||
name='Test Company',
|
||||
website='http://example.com',
|
||||
@@ -36,7 +38,7 @@ class CompanyModelTests(TestCase):
|
||||
point=Point(-118.2437, 34.0522)
|
||||
)
|
||||
|
||||
def test_company_creation(self):
|
||||
def test_company_creation(self) -> None:
|
||||
"""Test company instance creation and field values"""
|
||||
self.assertEqual(self.company.name, 'Test Company')
|
||||
self.assertEqual(self.company.website, 'http://example.com')
|
||||
@@ -46,22 +48,22 @@ class CompanyModelTests(TestCase):
|
||||
self.assertEqual(self.company.total_rides, 100)
|
||||
self.assertTrue(self.company.slug)
|
||||
|
||||
def test_company_str_representation(self):
|
||||
def test_company_str_representation(self) -> None:
|
||||
"""Test string representation of company"""
|
||||
self.assertEqual(str(self.company), 'Test Company')
|
||||
|
||||
def test_company_get_by_slug(self):
|
||||
def test_company_get_by_slug(self) -> None:
|
||||
"""Test get_by_slug class method"""
|
||||
company, is_historical = Company.get_by_slug(self.company.slug)
|
||||
self.assertEqual(company, self.company)
|
||||
self.assertFalse(is_historical)
|
||||
|
||||
def test_company_get_by_invalid_slug(self):
|
||||
def test_company_get_by_invalid_slug(self) -> None:
|
||||
"""Test get_by_slug with invalid slug"""
|
||||
with self.assertRaises(Company.DoesNotExist):
|
||||
Company.get_by_slug('invalid-slug')
|
||||
|
||||
def test_company_stats(self):
|
||||
def test_company_stats(self) -> None:
|
||||
"""Test company statistics fields"""
|
||||
self.company.total_parks = 10
|
||||
self.company.total_rides = 200
|
||||
@@ -72,7 +74,7 @@ class CompanyModelTests(TestCase):
|
||||
self.assertEqual(company.total_rides, 200)
|
||||
|
||||
class ManufacturerModelTests(TestCase):
|
||||
def setUp(self):
|
||||
def setUp(self) -> None:
|
||||
self.manufacturer = Manufacturer.objects.create(
|
||||
name='Test Manufacturer',
|
||||
website='http://example.com',
|
||||
@@ -95,7 +97,7 @@ class ManufacturerModelTests(TestCase):
|
||||
point=Point(-118.2437, 34.0522)
|
||||
)
|
||||
|
||||
def test_manufacturer_creation(self):
|
||||
def test_manufacturer_creation(self) -> None:
|
||||
"""Test manufacturer instance creation and field values"""
|
||||
self.assertEqual(self.manufacturer.name, 'Test Manufacturer')
|
||||
self.assertEqual(self.manufacturer.website, 'http://example.com')
|
||||
@@ -105,22 +107,22 @@ class ManufacturerModelTests(TestCase):
|
||||
self.assertEqual(self.manufacturer.total_roller_coasters, 20)
|
||||
self.assertTrue(self.manufacturer.slug)
|
||||
|
||||
def test_manufacturer_str_representation(self):
|
||||
def test_manufacturer_str_representation(self) -> None:
|
||||
"""Test string representation of manufacturer"""
|
||||
self.assertEqual(str(self.manufacturer), 'Test Manufacturer')
|
||||
|
||||
def test_manufacturer_get_by_slug(self):
|
||||
def test_manufacturer_get_by_slug(self) -> None:
|
||||
"""Test get_by_slug class method"""
|
||||
manufacturer, is_historical = Manufacturer.get_by_slug(self.manufacturer.slug)
|
||||
self.assertEqual(manufacturer, self.manufacturer)
|
||||
self.assertFalse(is_historical)
|
||||
|
||||
def test_manufacturer_get_by_invalid_slug(self):
|
||||
def test_manufacturer_get_by_invalid_slug(self) -> None:
|
||||
"""Test get_by_slug with invalid slug"""
|
||||
with self.assertRaises(Manufacturer.DoesNotExist):
|
||||
Manufacturer.get_by_slug('invalid-slug')
|
||||
|
||||
def test_manufacturer_stats(self):
|
||||
def test_manufacturer_stats(self) -> None:
|
||||
"""Test manufacturer statistics fields"""
|
||||
self.manufacturer.total_rides = 100
|
||||
self.manufacturer.total_roller_coasters = 40
|
||||
@@ -131,7 +133,7 @@ class ManufacturerModelTests(TestCase):
|
||||
self.assertEqual(manufacturer.total_roller_coasters, 40)
|
||||
|
||||
class CompanyViewTests(TestCase):
|
||||
def setUp(self):
|
||||
def setUp(self) -> None:
|
||||
self.client = Client()
|
||||
self.user = User.objects.create_user(
|
||||
username='testuser',
|
||||
@@ -164,13 +166,13 @@ class CompanyViewTests(TestCase):
|
||||
point=Point(-118.2437, 34.0522)
|
||||
)
|
||||
|
||||
def test_company_list_view(self):
|
||||
def test_company_list_view(self) -> None:
|
||||
"""Test company list view"""
|
||||
response = self.client.get(reverse('companies:company_list'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertContains(response, self.company.name)
|
||||
|
||||
def test_company_list_view_with_search(self):
|
||||
def test_company_list_view_with_search(self) -> None:
|
||||
"""Test company list view with search"""
|
||||
response = self.client.get(reverse('companies:company_list') + '?search=Test')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
@@ -180,7 +182,7 @@ class CompanyViewTests(TestCase):
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertNotContains(response, self.company.name)
|
||||
|
||||
def test_company_list_view_with_country_filter(self):
|
||||
def test_company_list_view_with_country_filter(self) -> None:
|
||||
"""Test company list view with country filter"""
|
||||
response = self.client.get(reverse('companies:company_list') + '?country=Test Country')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
@@ -190,7 +192,7 @@ class CompanyViewTests(TestCase):
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertNotContains(response, self.company.name)
|
||||
|
||||
def test_company_detail_view(self):
|
||||
def test_company_detail_view(self) -> None:
|
||||
"""Test company detail view"""
|
||||
response = self.client.get(
|
||||
reverse('companies:company_detail', kwargs={'slug': self.company.slug})
|
||||
@@ -200,25 +202,25 @@ class CompanyViewTests(TestCase):
|
||||
self.assertContains(response, self.company.website)
|
||||
self.assertContains(response, self.company.headquarters)
|
||||
|
||||
def test_company_detail_view_invalid_slug(self):
|
||||
def test_company_detail_view_invalid_slug(self) -> None:
|
||||
"""Test company detail view with invalid slug"""
|
||||
response = self.client.get(
|
||||
reverse('companies:company_detail', kwargs={'slug': 'invalid-slug'})
|
||||
)
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
def test_company_create_view_unauthenticated(self):
|
||||
def test_company_create_view_unauthenticated(self) -> None:
|
||||
"""Test company create view when not logged in"""
|
||||
response = self.client.get(reverse('companies:company_create'))
|
||||
self.assertEqual(response.status_code, 302) # Redirects to login
|
||||
|
||||
def test_company_create_view_authenticated(self):
|
||||
def test_company_create_view_authenticated(self) -> None:
|
||||
"""Test company create view when logged in"""
|
||||
self.client.login(username='testuser', password='testpass123')
|
||||
response = self.client.get(reverse('companies:company_create'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_company_create_submission_regular_user(self):
|
||||
def test_company_create_submission_regular_user(self) -> None:
|
||||
"""Test creating a company submission as regular user"""
|
||||
self.client.login(username='testuser', password='testpass123')
|
||||
data = {
|
||||
@@ -237,7 +239,7 @@ class CompanyViewTests(TestCase):
|
||||
status='NEW'
|
||||
).exists())
|
||||
|
||||
def test_company_create_submission_moderator(self):
|
||||
def test_company_create_submission_moderator(self) -> None:
|
||||
"""Test creating a company submission as moderator"""
|
||||
self.client.login(username='moderator', password='modpass123')
|
||||
data = {
|
||||
@@ -257,7 +259,7 @@ class CompanyViewTests(TestCase):
|
||||
self.assertEqual(submission.status, 'APPROVED')
|
||||
self.assertEqual(submission.handled_by, self.moderator)
|
||||
|
||||
def test_company_photo_submission(self):
|
||||
def test_company_photo_submission(self) -> None:
|
||||
"""Test photo submission for company"""
|
||||
self.client.login(username='testuser', password='testpass123')
|
||||
image_content = b'GIF87a\x01\x00\x01\x00\x80\x01\x00\x00\x00\x00ccc,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02D\x01\x00;'
|
||||
@@ -267,19 +269,19 @@ class CompanyViewTests(TestCase):
|
||||
'caption': 'Test Photo',
|
||||
'date_taken': '2024-01-01'
|
||||
}
|
||||
response = self.client.post(
|
||||
response = cast(HttpResponse, self.client.post(
|
||||
reverse('companies:company_detail', kwargs={'slug': self.company.slug}),
|
||||
data,
|
||||
HTTP_X_REQUESTED_WITH='XMLHttpRequest' # Simulate AJAX request
|
||||
)
|
||||
))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertTrue(PhotoSubmission.objects.filter(
|
||||
content_type=ContentType.objects.get_for_model(Company),
|
||||
object_id=self.company.id
|
||||
object_id=self.company.pk
|
||||
).exists())
|
||||
|
||||
class ManufacturerViewTests(TestCase):
|
||||
def setUp(self):
|
||||
def setUp(self) -> None:
|
||||
self.client = Client()
|
||||
self.user = User.objects.create_user(
|
||||
username='testuser',
|
||||
@@ -312,13 +314,13 @@ class ManufacturerViewTests(TestCase):
|
||||
point=Point(-118.2437, 34.0522)
|
||||
)
|
||||
|
||||
def test_manufacturer_list_view(self):
|
||||
def test_manufacturer_list_view(self) -> None:
|
||||
"""Test manufacturer list view"""
|
||||
response = self.client.get(reverse('companies:manufacturer_list'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertContains(response, self.manufacturer.name)
|
||||
|
||||
def test_manufacturer_list_view_with_search(self):
|
||||
def test_manufacturer_list_view_with_search(self) -> None:
|
||||
"""Test manufacturer list view with search"""
|
||||
response = self.client.get(reverse('companies:manufacturer_list') + '?search=Test')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
@@ -328,7 +330,7 @@ class ManufacturerViewTests(TestCase):
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertNotContains(response, self.manufacturer.name)
|
||||
|
||||
def test_manufacturer_list_view_with_country_filter(self):
|
||||
def test_manufacturer_list_view_with_country_filter(self) -> None:
|
||||
"""Test manufacturer list view with country filter"""
|
||||
response = self.client.get(reverse('companies:manufacturer_list') + '?country=Test Country')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
@@ -338,7 +340,7 @@ class ManufacturerViewTests(TestCase):
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertNotContains(response, self.manufacturer.name)
|
||||
|
||||
def test_manufacturer_detail_view(self):
|
||||
def test_manufacturer_detail_view(self) -> None:
|
||||
"""Test manufacturer detail view"""
|
||||
response = self.client.get(
|
||||
reverse('companies:manufacturer_detail', kwargs={'slug': self.manufacturer.slug})
|
||||
@@ -348,25 +350,25 @@ class ManufacturerViewTests(TestCase):
|
||||
self.assertContains(response, self.manufacturer.website)
|
||||
self.assertContains(response, self.manufacturer.headquarters)
|
||||
|
||||
def test_manufacturer_detail_view_invalid_slug(self):
|
||||
def test_manufacturer_detail_view_invalid_slug(self) -> None:
|
||||
"""Test manufacturer detail view with invalid slug"""
|
||||
response = self.client.get(
|
||||
reverse('companies:manufacturer_detail', kwargs={'slug': 'invalid-slug'})
|
||||
)
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
def test_manufacturer_create_view_unauthenticated(self):
|
||||
def test_manufacturer_create_view_unauthenticated(self) -> None:
|
||||
"""Test manufacturer create view when not logged in"""
|
||||
response = self.client.get(reverse('companies:manufacturer_create'))
|
||||
self.assertEqual(response.status_code, 302) # Redirects to login
|
||||
|
||||
def test_manufacturer_create_view_authenticated(self):
|
||||
def test_manufacturer_create_view_authenticated(self) -> None:
|
||||
"""Test manufacturer create view when logged in"""
|
||||
self.client.login(username='testuser', password='testpass123')
|
||||
response = self.client.get(reverse('companies:manufacturer_create'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_manufacturer_create_submission_regular_user(self):
|
||||
def test_manufacturer_create_submission_regular_user(self) -> None:
|
||||
"""Test creating a manufacturer submission as regular user"""
|
||||
self.client.login(username='testuser', password='testpass123')
|
||||
data = {
|
||||
@@ -385,7 +387,7 @@ class ManufacturerViewTests(TestCase):
|
||||
status='NEW'
|
||||
).exists())
|
||||
|
||||
def test_manufacturer_create_submission_moderator(self):
|
||||
def test_manufacturer_create_submission_moderator(self) -> None:
|
||||
"""Test creating a manufacturer submission as moderator"""
|
||||
self.client.login(username='moderator', password='modpass123')
|
||||
data = {
|
||||
@@ -405,7 +407,7 @@ class ManufacturerViewTests(TestCase):
|
||||
self.assertEqual(submission.status, 'APPROVED')
|
||||
self.assertEqual(submission.handled_by, self.moderator)
|
||||
|
||||
def test_manufacturer_photo_submission(self):
|
||||
def test_manufacturer_photo_submission(self) -> None:
|
||||
"""Test photo submission for manufacturer"""
|
||||
self.client.login(username='testuser', password='testpass123')
|
||||
image_content = b'GIF87a\x01\x00\x01\x00\x80\x01\x00\x00\x00\x00ccc,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02D\x01\x00;'
|
||||
@@ -415,13 +417,13 @@ class ManufacturerViewTests(TestCase):
|
||||
'caption': 'Test Photo',
|
||||
'date_taken': '2024-01-01'
|
||||
}
|
||||
response = self.client.post(
|
||||
response = cast(HttpResponse, self.client.post(
|
||||
reverse('companies:manufacturer_detail', kwargs={'slug': self.manufacturer.slug}),
|
||||
data,
|
||||
HTTP_X_REQUESTED_WITH='XMLHttpRequest' # Simulate AJAX request
|
||||
)
|
||||
))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertTrue(PhotoSubmission.objects.filter(
|
||||
content_type=ContentType.objects.get_for_model(Manufacturer),
|
||||
object_id=self.manufacturer.id
|
||||
object_id=self.manufacturer.pk
|
||||
).exists())
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
from typing import Any, Optional, Tuple, Type, cast, Union, Dict, Callable
|
||||
from django.views.generic import DetailView, ListView, CreateView, UpdateView
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.urls import reverse
|
||||
from django.contrib.auth.mixins import LoginRequiredMixin
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.contrib import messages
|
||||
from django.http import HttpResponseRedirect, Http404, JsonResponse
|
||||
from django.db.models import Count, Sum, Q
|
||||
from django.http import HttpResponseRedirect, Http404, JsonResponse, HttpResponse
|
||||
from django.db.models import Count, Sum, Q, QuerySet, Model
|
||||
from django.contrib.auth import get_user_model
|
||||
from .models import Company, Manufacturer
|
||||
from .forms import CompanyForm, ManufacturerForm
|
||||
from rides.models import Ride
|
||||
@@ -15,302 +17,349 @@ from core.views import SlugRedirectMixin
|
||||
from moderation.mixins import EditSubmissionMixin, PhotoSubmissionMixin, HistoryMixin
|
||||
from moderation.models import EditSubmission
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
ModelType = Union[Type[Company], Type[Manufacturer]]
|
||||
|
||||
def get_company_parks(company: Company) -> QuerySet[Park]:
|
||||
"""Get parks owned by a company with related data."""
|
||||
return Park.objects.filter(
|
||||
owner=company
|
||||
).select_related('owner')
|
||||
|
||||
def get_company_ride_count(parks: QuerySet[Park]) -> int:
|
||||
"""Get total number of rides across all parks."""
|
||||
return Ride.objects.filter(park__in=parks).count()
|
||||
|
||||
def get_manufacturer_rides(manufacturer: Manufacturer) -> QuerySet[Ride]:
|
||||
"""Get rides made by a manufacturer with related data."""
|
||||
return Ride.objects.filter(
|
||||
manufacturer=manufacturer
|
||||
).select_related('park', 'coaster_stats')
|
||||
|
||||
def get_manufacturer_stats(rides: QuerySet[Ride]) -> Dict[str, int]:
|
||||
"""Get statistics for manufacturer rides."""
|
||||
return {
|
||||
'coaster_count': rides.filter(category='ROLLER_COASTER').count(),
|
||||
'parks_count': rides.values('park').distinct().count()
|
||||
}
|
||||
|
||||
def handle_submission_post(
|
||||
request: Any,
|
||||
handle_photo_submission: Callable[[Any], HttpResponse],
|
||||
super_post: Callable[..., HttpResponse],
|
||||
*args: Any,
|
||||
**kwargs: Any
|
||||
) -> HttpResponse:
|
||||
"""Handle POST requests for photos and edits."""
|
||||
if request.FILES:
|
||||
# Handle photo submission
|
||||
return handle_photo_submission(request)
|
||||
# Handle edit submission
|
||||
return super_post(request, *args, **kwargs)
|
||||
|
||||
# List Views
|
||||
class CompanyListView(ListView):
|
||||
model = Company
|
||||
template_name = 'companies/company_list.html'
|
||||
context_object_name = 'companies'
|
||||
model: Type[Company] = Company
|
||||
template_name = "companies/company_list.html"
|
||||
context_object_name = "companies"
|
||||
paginate_by = 12
|
||||
|
||||
def get_queryset(self):
|
||||
queryset = Company.objects.all()
|
||||
def get_queryset(self) -> QuerySet[Company]:
|
||||
queryset = self.model.objects.all()
|
||||
|
||||
# Filter by country if specified
|
||||
country = self.request.GET.get('country')
|
||||
if country:
|
||||
if country := self.request.GET.get("country"):
|
||||
# Get companies that have locations in the specified country
|
||||
company_ids = Location.objects.filter(
|
||||
content_type=ContentType.objects.get_for_model(Company),
|
||||
country__iexact=country
|
||||
).values_list('object_id', flat=True)
|
||||
queryset = queryset.filter(id__in=company_ids)
|
||||
country__iexact=country,
|
||||
).values_list("object_id", flat=True)
|
||||
queryset = queryset.filter(pk__in=company_ids)
|
||||
|
||||
# Search by name if specified
|
||||
search = self.request.GET.get('search')
|
||||
if search:
|
||||
if search := self.request.GET.get("search"):
|
||||
queryset = queryset.filter(name__icontains=search)
|
||||
|
||||
return queryset.order_by('name')
|
||||
return queryset.order_by("name")
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
||||
context = super().get_context_data(**kwargs)
|
||||
# Add filter values to context
|
||||
context['country'] = self.request.GET.get('country', '')
|
||||
context['search'] = self.request.GET.get('search', '')
|
||||
context["country"] = self.request.GET.get("country", "")
|
||||
context["search"] = self.request.GET.get("search", "")
|
||||
return context
|
||||
|
||||
|
||||
class ManufacturerListView(ListView):
|
||||
model = Manufacturer
|
||||
template_name = 'companies/manufacturer_list.html'
|
||||
context_object_name = 'manufacturers'
|
||||
model: Type[Manufacturer] = Manufacturer
|
||||
template_name = "companies/manufacturer_list.html"
|
||||
context_object_name = "manufacturers"
|
||||
paginate_by = 12
|
||||
|
||||
def get_queryset(self):
|
||||
queryset = Manufacturer.objects.all()
|
||||
def get_queryset(self) -> QuerySet[Manufacturer]:
|
||||
queryset = self.model.objects.all()
|
||||
|
||||
# Filter by country if specified
|
||||
country = self.request.GET.get('country')
|
||||
if country:
|
||||
if country := self.request.GET.get("country"):
|
||||
# Get manufacturers that have locations in the specified country
|
||||
manufacturer_ids = Location.objects.filter(
|
||||
content_type=ContentType.objects.get_for_model(Manufacturer),
|
||||
country__iexact=country
|
||||
).values_list('object_id', flat=True)
|
||||
queryset = queryset.filter(id__in=manufacturer_ids)
|
||||
country__iexact=country,
|
||||
).values_list("object_id", flat=True)
|
||||
queryset = queryset.filter(pk__in=manufacturer_ids)
|
||||
|
||||
# Search by name if specified
|
||||
search = self.request.GET.get('search')
|
||||
if search:
|
||||
if search := self.request.GET.get("search"):
|
||||
queryset = queryset.filter(name__icontains=search)
|
||||
|
||||
return queryset.order_by('name')
|
||||
return queryset.order_by("name")
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
||||
context = super().get_context_data(**kwargs)
|
||||
# Add stats for filtering
|
||||
context['total_manufacturers'] = self.model.objects.count()
|
||||
context['total_rides'] = Ride.objects.filter(
|
||||
manufacturer__isnull=False
|
||||
).count()
|
||||
context['total_roller_coasters'] = Ride.objects.filter(
|
||||
manufacturer__isnull=False,
|
||||
category='ROLLER_COASTER'
|
||||
context["total_manufacturers"] = self.model.objects.count()
|
||||
context["total_rides"] = Ride.objects.filter(manufacturer__isnull=False).count()
|
||||
context["total_roller_coasters"] = Ride.objects.filter(
|
||||
manufacturer__isnull=False, category="ROLLER_COASTER"
|
||||
).count()
|
||||
# Add filter values to context
|
||||
context['country'] = self.request.GET.get('country', '')
|
||||
context['search'] = self.request.GET.get('search', '')
|
||||
context["country"] = self.request.GET.get("country", "")
|
||||
context["search"] = self.request.GET.get("search", "")
|
||||
return context
|
||||
|
||||
|
||||
# Detail Views
|
||||
class CompanyDetailView(SlugRedirectMixin, EditSubmissionMixin, PhotoSubmissionMixin, HistoryMixin, DetailView):
|
||||
model = Company
|
||||
model: Type[Company] = Company
|
||||
template_name = 'companies/company_detail.html'
|
||||
context_object_name = 'company'
|
||||
|
||||
def get_object(self, queryset=None):
|
||||
def get_object(self, queryset: Optional[QuerySet[Company]] = None) -> Company:
|
||||
if queryset is None:
|
||||
queryset = self.get_queryset()
|
||||
slug = self.kwargs.get(self.slug_url_kwarg)
|
||||
try:
|
||||
# Try to get by current or historical slug
|
||||
return self.model.get_by_slug(slug)[0]
|
||||
except self.model.DoesNotExist:
|
||||
raise Http404(f"No {self.model._meta.verbose_name} found matching the query")
|
||||
model = cast(Type[Company], self.model)
|
||||
obj, _ = model.get_by_slug(slug)
|
||||
return obj
|
||||
except model.DoesNotExist as e:
|
||||
raise Http404(f"No {model._meta.verbose_name} found matching the query") from e
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
||||
context = super().get_context_data(**kwargs)
|
||||
parks = Park.objects.filter(
|
||||
owner=self.object
|
||||
).select_related('owner')
|
||||
company = cast(Company, self.object)
|
||||
|
||||
parks = get_company_parks(company)
|
||||
context['parks'] = parks
|
||||
context['total_rides'] = Ride.objects.filter(park__in=parks).count()
|
||||
|
||||
context['total_rides'] = get_company_ride_count(parks)
|
||||
return context
|
||||
|
||||
def get_redirect_url_pattern(self):
|
||||
def get_redirect_url_pattern(self) -> str:
|
||||
return 'companies:company_detail'
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
"""Handle POST requests for photos and edits"""
|
||||
if request.FILES:
|
||||
# Handle photo submission
|
||||
return self.handle_photo_submission(request)
|
||||
# Handle edit submission
|
||||
return super().post(request, *args, **kwargs)
|
||||
def post(self, request: Any, *args: Any, **kwargs: Any) -> HttpResponse:
|
||||
"""Handle POST requests for photos and edits."""
|
||||
return handle_submission_post(
|
||||
request,
|
||||
self.handle_photo_submission,
|
||||
super().post,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
class ManufacturerDetailView(SlugRedirectMixin, EditSubmissionMixin, PhotoSubmissionMixin, HistoryMixin, DetailView):
|
||||
model = Manufacturer
|
||||
model: Type[Manufacturer] = Manufacturer
|
||||
template_name = 'companies/manufacturer_detail.html'
|
||||
context_object_name = 'manufacturer'
|
||||
|
||||
def get_object(self, queryset=None):
|
||||
def get_object(self, queryset: Optional[QuerySet[Manufacturer]] = None) -> Manufacturer:
|
||||
if queryset is None:
|
||||
queryset = self.get_queryset()
|
||||
slug = self.kwargs.get(self.slug_url_kwarg)
|
||||
try:
|
||||
# Try to get by current or historical slug
|
||||
return self.model.get_by_slug(slug)[0]
|
||||
except self.model.DoesNotExist:
|
||||
raise Http404(f"No {self.model._meta.verbose_name} found matching the query")
|
||||
model = cast(Type[Manufacturer], self.model)
|
||||
obj, _ = model.get_by_slug(slug)
|
||||
return obj
|
||||
except model.DoesNotExist as e:
|
||||
raise Http404(f"No {model._meta.verbose_name} found matching the query") from e
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
||||
context = super().get_context_data(**kwargs)
|
||||
rides = Ride.objects.filter(
|
||||
manufacturer=self.object
|
||||
).select_related('park', 'coaster_stats')
|
||||
manufacturer = cast(Manufacturer, self.object)
|
||||
|
||||
rides = get_manufacturer_rides(manufacturer)
|
||||
context['rides'] = rides
|
||||
context['coaster_count'] = rides.filter(category='ROLLER_COASTER').count()
|
||||
context['parks_count'] = rides.values('park').distinct().count()
|
||||
|
||||
context.update(get_manufacturer_stats(rides))
|
||||
return context
|
||||
|
||||
def get_redirect_url_pattern(self):
|
||||
def get_redirect_url_pattern(self) -> str:
|
||||
return 'companies:manufacturer_detail'
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
"""Handle POST requests for photos and edits"""
|
||||
if request.FILES:
|
||||
# Handle photo submission
|
||||
return self.handle_photo_submission(request)
|
||||
# Handle edit submission
|
||||
return super().post(request, *args, **kwargs)
|
||||
def post(self, request: Any, *args: Any, **kwargs: Any) -> HttpResponse:
|
||||
"""Handle POST requests for photos and edits."""
|
||||
return handle_submission_post(
|
||||
request,
|
||||
self.handle_photo_submission,
|
||||
super().post,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
||||
def _handle_submission(
|
||||
request: Any, form: Any, model: ModelType, success_url: str
|
||||
) -> HttpResponseRedirect:
|
||||
"""Helper method to handle form submissions"""
|
||||
cleaned_data = form.cleaned_data.copy()
|
||||
submission = EditSubmission.objects.create(
|
||||
user=request.user,
|
||||
content_type=ContentType.objects.get_for_model(model),
|
||||
submission_type="CREATE",
|
||||
changes=cleaned_data,
|
||||
reason=request.POST.get("reason", ""),
|
||||
source=request.POST.get("source", ""),
|
||||
)
|
||||
|
||||
# Get user role safely
|
||||
user_role = getattr(request.user, "role", None)
|
||||
|
||||
# If user is moderator or above, auto-approve
|
||||
if user_role in ["MODERATOR", "ADMIN", "SUPERUSER"]:
|
||||
obj = form.save()
|
||||
submission.object_id = obj.pk
|
||||
submission.status = "APPROVED"
|
||||
submission.handled_by = request.user
|
||||
submission.save()
|
||||
messages.success(request, f'Successfully created {getattr(obj, "name", "")}')
|
||||
return HttpResponseRedirect(success_url)
|
||||
|
||||
messages.success(request, "Your submission has been sent for review")
|
||||
return HttpResponseRedirect(reverse(f"companies:{model.__name__.lower()}_list"))
|
||||
|
||||
|
||||
# Create Views
|
||||
class CompanyCreateView(LoginRequiredMixin, CreateView):
|
||||
model = Company
|
||||
model: Type[Company] = Company
|
||||
form_class = CompanyForm
|
||||
template_name = 'companies/company_form.html'
|
||||
template_name = "companies/company_form.html"
|
||||
object: Optional[Company]
|
||||
|
||||
def form_valid(self, form):
|
||||
cleaned_data = form.cleaned_data.copy()
|
||||
|
||||
# Create submission record
|
||||
submission = EditSubmission.objects.create(
|
||||
user=self.request.user,
|
||||
content_type=ContentType.objects.get_for_model(Company),
|
||||
submission_type='CREATE',
|
||||
changes=cleaned_data,
|
||||
reason=self.request.POST.get('reason', ''),
|
||||
source=self.request.POST.get('source', '')
|
||||
def form_valid(self, form: CompanyForm) -> HttpResponseRedirect:
|
||||
success_url = reverse(
|
||||
"companies:company_detail", kwargs={"slug": form.instance.slug}
|
||||
)
|
||||
return _handle_submission(self.request, form, self.model, success_url)
|
||||
|
||||
# If user is moderator or above, auto-approve
|
||||
if self.request.user.role in ['MODERATOR', 'ADMIN', 'SUPERUSER']:
|
||||
self.object = form.save()
|
||||
submission.object_id = self.object.id
|
||||
submission.status = 'APPROVED'
|
||||
submission.handled_by = self.request.user
|
||||
submission.save()
|
||||
messages.success(self.request, f'Successfully created {self.object.name}')
|
||||
return HttpResponseRedirect(self.get_success_url())
|
||||
def get_success_url(self) -> str:
|
||||
if self.object is None:
|
||||
return reverse("companies:company_list")
|
||||
return reverse("companies:company_detail", kwargs={"slug": self.object.slug})
|
||||
|
||||
messages.success(self.request, 'Your company submission has been sent for review')
|
||||
return HttpResponseRedirect(reverse('companies:company_list'))
|
||||
|
||||
def get_success_url(self):
|
||||
return reverse('companies:company_detail', kwargs={'slug': self.object.slug})
|
||||
|
||||
class ManufacturerCreateView(LoginRequiredMixin, CreateView):
|
||||
model = Manufacturer
|
||||
model: Type[Manufacturer] = Manufacturer
|
||||
form_class = ManufacturerForm
|
||||
template_name = 'companies/manufacturer_form.html'
|
||||
template_name = "companies/manufacturer_form.html"
|
||||
object: Optional[Manufacturer]
|
||||
|
||||
def form_valid(self, form):
|
||||
cleaned_data = form.cleaned_data.copy()
|
||||
def form_valid(self, form: ManufacturerForm) -> HttpResponseRedirect:
|
||||
success_url = reverse(
|
||||
"companies:manufacturer_detail", kwargs={"slug": form.instance.slug}
|
||||
)
|
||||
return _handle_submission(self.request, form, self.model, success_url)
|
||||
|
||||
# Create submission record
|
||||
submission = EditSubmission.objects.create(
|
||||
user=self.request.user,
|
||||
content_type=ContentType.objects.get_for_model(Manufacturer),
|
||||
submission_type='CREATE',
|
||||
changes=cleaned_data,
|
||||
reason=self.request.POST.get('reason', ''),
|
||||
source=self.request.POST.get('source', '')
|
||||
def get_success_url(self) -> str:
|
||||
if self.object is None:
|
||||
return reverse("companies:manufacturer_list")
|
||||
return reverse(
|
||||
"companies:manufacturer_detail", kwargs={"slug": self.object.slug}
|
||||
)
|
||||
|
||||
# If user is moderator or above, auto-approve
|
||||
if self.request.user.role in ['MODERATOR', 'ADMIN', 'SUPERUSER']:
|
||||
self.object = form.save()
|
||||
submission.object_id = self.object.id
|
||||
submission.status = 'APPROVED'
|
||||
submission.handled_by = self.request.user
|
||||
submission.save()
|
||||
messages.success(self.request, f'Successfully created {self.object.name}')
|
||||
return HttpResponseRedirect(self.get_success_url())
|
||||
|
||||
messages.success(self.request, 'Your manufacturer submission has been sent for review')
|
||||
return HttpResponseRedirect(reverse('companies:manufacturer_list'))
|
||||
def _handle_update(
|
||||
request: Any, form: Any, obj: Union[Company, Manufacturer], model: ModelType
|
||||
) -> HttpResponseRedirect:
|
||||
"""Helper method to handle update submissions"""
|
||||
cleaned_data = form.cleaned_data.copy()
|
||||
submission = EditSubmission.objects.create(
|
||||
user=request.user,
|
||||
content_type=ContentType.objects.get_for_model(model),
|
||||
object_id=obj.pk,
|
||||
submission_type="EDIT",
|
||||
changes=cleaned_data,
|
||||
reason=request.POST.get("reason", ""),
|
||||
source=request.POST.get("source", ""),
|
||||
)
|
||||
|
||||
# Get user role safely
|
||||
user_role = getattr(request.user, "role", None)
|
||||
|
||||
# If user is moderator or above, auto-approve
|
||||
if user_role in ["MODERATOR", "ADMIN", "SUPERUSER"]:
|
||||
obj = form.save()
|
||||
submission.status = "APPROVED"
|
||||
submission.handled_by = request.user
|
||||
submission.save()
|
||||
messages.success(request, f'Successfully updated {getattr(obj, "name", "")}')
|
||||
return HttpResponseRedirect(
|
||||
reverse(
|
||||
f"companies:{model.__name__.lower()}_detail",
|
||||
kwargs={"slug": getattr(obj, "slug", "")},
|
||||
)
|
||||
)
|
||||
|
||||
messages.success(
|
||||
request, f'Your changes to {getattr(obj, "name", "")} have been sent for review'
|
||||
)
|
||||
return HttpResponseRedirect(
|
||||
reverse(
|
||||
f"companies:{model.__name__.lower()}_detail",
|
||||
kwargs={"slug": getattr(obj, "slug", "")},
|
||||
)
|
||||
)
|
||||
|
||||
def get_success_url(self):
|
||||
return reverse('companies:manufacturer_detail', kwargs={'slug': self.object.slug})
|
||||
|
||||
# Update Views
|
||||
class CompanyUpdateView(LoginRequiredMixin, UpdateView):
|
||||
model = Company
|
||||
model: Type[Company] = Company
|
||||
form_class = CompanyForm
|
||||
template_name = 'companies/company_form.html'
|
||||
template_name = "companies/company_form.html"
|
||||
object: Optional[Company]
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
||||
context = super().get_context_data(**kwargs)
|
||||
context['is_edit'] = True
|
||||
context["is_edit"] = True
|
||||
return context
|
||||
|
||||
def form_valid(self, form):
|
||||
cleaned_data = form.cleaned_data.copy()
|
||||
def form_valid(self, form: CompanyForm) -> HttpResponseRedirect:
|
||||
if self.object is None:
|
||||
return HttpResponseRedirect(reverse("companies:company_list"))
|
||||
return _handle_update(self.request, form, self.object, self.model)
|
||||
|
||||
# Create submission record
|
||||
submission = EditSubmission.objects.create(
|
||||
user=self.request.user,
|
||||
content_type=ContentType.objects.get_for_model(Company),
|
||||
object_id=self.object.id,
|
||||
submission_type='EDIT',
|
||||
changes=cleaned_data,
|
||||
reason=self.request.POST.get('reason', ''),
|
||||
source=self.request.POST.get('source', '')
|
||||
)
|
||||
def get_success_url(self) -> str:
|
||||
if self.object is None:
|
||||
return reverse("companies:company_list")
|
||||
return reverse("companies:company_detail", kwargs={"slug": self.object.slug})
|
||||
|
||||
# If user is moderator or above, auto-approve
|
||||
if self.request.user.role in ['MODERATOR', 'ADMIN', 'SUPERUSER']:
|
||||
self.object = form.save()
|
||||
submission.status = 'APPROVED'
|
||||
submission.handled_by = self.request.user
|
||||
submission.save()
|
||||
messages.success(self.request, f'Successfully updated {self.object.name}')
|
||||
return HttpResponseRedirect(self.get_success_url())
|
||||
|
||||
messages.success(self.request, f'Your changes to {self.object.name} have been sent for review')
|
||||
return HttpResponseRedirect(reverse('companies:company_detail', kwargs={'slug': self.object.slug}))
|
||||
|
||||
def get_success_url(self):
|
||||
return reverse('companies:company_detail', kwargs={'slug': self.object.slug})
|
||||
|
||||
class ManufacturerUpdateView(LoginRequiredMixin, UpdateView):
|
||||
model = Manufacturer
|
||||
model: Type[Manufacturer] = Manufacturer
|
||||
form_class = ManufacturerForm
|
||||
template_name = 'companies/manufacturer_form.html'
|
||||
template_name = "companies/manufacturer_form.html"
|
||||
object: Optional[Manufacturer]
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
||||
context = super().get_context_data(**kwargs)
|
||||
context['is_edit'] = True
|
||||
context["is_edit"] = True
|
||||
return context
|
||||
|
||||
def form_valid(self, form):
|
||||
cleaned_data = form.cleaned_data.copy()
|
||||
def form_valid(self, form: ManufacturerForm) -> HttpResponseRedirect:
|
||||
if self.object is None:
|
||||
return HttpResponseRedirect(reverse("companies:manufacturer_list"))
|
||||
return _handle_update(self.request, form, self.object, self.model)
|
||||
|
||||
# Create submission record
|
||||
submission = EditSubmission.objects.create(
|
||||
user=self.request.user,
|
||||
content_type=ContentType.objects.get_for_model(Manufacturer),
|
||||
object_id=self.object.id,
|
||||
submission_type='EDIT',
|
||||
changes=cleaned_data,
|
||||
reason=self.request.POST.get('reason', ''),
|
||||
source=self.request.POST.get('source', '')
|
||||
def get_success_url(self) -> str:
|
||||
if self.object is None:
|
||||
return reverse("companies:manufacturer_list")
|
||||
return reverse(
|
||||
"companies:manufacturer_detail", kwargs={"slug": self.object.slug}
|
||||
)
|
||||
|
||||
# If user is moderator or above, auto-approve
|
||||
if self.request.user.role in ['MODERATOR', 'ADMIN', 'SUPERUSER']:
|
||||
self.object = form.save()
|
||||
submission.status = 'APPROVED'
|
||||
submission.handled_by = self.request.user
|
||||
submission.save()
|
||||
messages.success(self.request, f'Successfully updated {self.object.name}')
|
||||
return HttpResponseRedirect(self.get_success_url())
|
||||
|
||||
messages.success(self.request, f'Your changes to {self.object.name} have been sent for review')
|
||||
return HttpResponseRedirect(reverse('companies:manufacturer_detail', kwargs={'slug': self.object.slug}))
|
||||
|
||||
def get_success_url(self):
|
||||
return reverse('companies:manufacturer_detail', kwargs={'slug': self.object.slug})
|
||||
|
||||
@@ -1,23 +1,31 @@
|
||||
from typing import Any, Dict, Optional, Type, cast
|
||||
from django.shortcuts import redirect
|
||||
from django.urls import reverse
|
||||
from django.views.generic import DetailView
|
||||
from django.views import View
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.db.models import Model
|
||||
|
||||
class SlugRedirectMixin:
|
||||
class SlugRedirectMixin(View):
|
||||
"""
|
||||
Mixin that handles redirects for old slugs.
|
||||
Requires the model to inherit from SluggedModel and view to inherit from DetailView.
|
||||
"""
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
model: Optional[Type[Model]] = None
|
||||
slug_url_kwarg: str = 'slug'
|
||||
object: Optional[Model] = None
|
||||
|
||||
def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
||||
# Only apply slug redirect logic to DetailViews
|
||||
if not isinstance(self, DetailView):
|
||||
return super().dispatch(request, *args, **kwargs)
|
||||
|
||||
# Get the object using current or historical slug
|
||||
try:
|
||||
self.object = self.get_object()
|
||||
self.object = self.get_object() # type: ignore
|
||||
# Check if we used an old slug
|
||||
current_slug = kwargs.get(self.slug_url_kwarg)
|
||||
if current_slug and current_slug != self.object.slug:
|
||||
if current_slug and current_slug != getattr(self.object, 'slug', None):
|
||||
# Get the URL pattern name from the view
|
||||
url_pattern = self.get_redirect_url_pattern()
|
||||
# Build kwargs for reverse()
|
||||
@@ -28,10 +36,13 @@ class SlugRedirectMixin:
|
||||
permanent=True
|
||||
)
|
||||
return super().dispatch(request, *args, **kwargs)
|
||||
except (self.model.DoesNotExist, AttributeError):
|
||||
except (AttributeError, Exception) as e: # type: ignore
|
||||
if self.model and hasattr(self.model, 'DoesNotExist'):
|
||||
if isinstance(e, self.model.DoesNotExist): # type: ignore
|
||||
return super().dispatch(request, *args, **kwargs)
|
||||
return super().dispatch(request, *args, **kwargs)
|
||||
|
||||
def get_redirect_url_pattern(self):
|
||||
def get_redirect_url_pattern(self) -> str:
|
||||
"""
|
||||
Get the URL pattern name for redirects.
|
||||
Should be overridden by subclasses.
|
||||
@@ -40,9 +51,11 @@ class SlugRedirectMixin:
|
||||
"Subclasses must implement get_redirect_url_pattern()"
|
||||
)
|
||||
|
||||
def get_redirect_url_kwargs(self):
|
||||
def get_redirect_url_kwargs(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Get the kwargs for reverse() when redirecting.
|
||||
Should be overridden by subclasses if they need custom kwargs.
|
||||
"""
|
||||
return {self.slug_url_kwarg: self.object.slug}
|
||||
if not self.object:
|
||||
return {}
|
||||
return {self.slug_url_kwarg: getattr(self.object, 'slug', '')}
|
||||
|
||||
54
history_tracking/migrations/0003_initial.py
Normal file
@@ -0,0 +1,54 @@
|
||||
# Generated by Django 5.1.2 on 2024-11-05 20:44
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
("contenttypes", "0002_remove_content_type_name"),
|
||||
(
|
||||
"history_tracking",
|
||||
"0002_remove_historicalpark_history_user_delete_park_and_more",
|
||||
),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="HistoricalSlug",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.BigAutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
("object_id", models.PositiveIntegerField()),
|
||||
("slug", models.SlugField(max_length=255)),
|
||||
("created_at", models.DateTimeField(auto_now_add=True)),
|
||||
(
|
||||
"content_type",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
to="contenttypes.contenttype",
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
"indexes": [
|
||||
models.Index(
|
||||
fields=["content_type", "object_id"],
|
||||
name="history_tra_content_63013c_idx",
|
||||
),
|
||||
models.Index(fields=["slug"], name="history_tra_slug_f843aa_idx"),
|
||||
],
|
||||
"unique_together": {("content_type", "slug")},
|
||||
},
|
||||
),
|
||||
]
|
||||
@@ -1,14 +1,39 @@
|
||||
# history_tracking/models.py
|
||||
from django.db import models
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.contrib.contenttypes.fields import GenericForeignKey
|
||||
from simple_history.models import HistoricalRecords
|
||||
from .mixins import HistoricalChangeMixin
|
||||
from typing import Any, Type, TypeVar, cast
|
||||
|
||||
T = TypeVar('T', bound=models.Model)
|
||||
|
||||
class HistoricalModel(models.Model):
|
||||
"""Abstract base class for models with history tracking"""
|
||||
history: HistoricalRecords = HistoricalRecords(inherit=True)
|
||||
|
||||
class Meta:
|
||||
abstract = True
|
||||
|
||||
@property
|
||||
def _history_model(self):
|
||||
return self.history.model
|
||||
def _history_model(self) -> Type[T]:
|
||||
"""Get the history model class"""
|
||||
return cast(Type[T], self.history.model) # type: ignore
|
||||
|
||||
class HistoricalSlug(models.Model):
|
||||
"""Track historical slugs for models"""
|
||||
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
|
||||
object_id = models.PositiveIntegerField()
|
||||
content_object = GenericForeignKey('content_type', 'object_id')
|
||||
slug = models.SlugField(max_length=255)
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
class Meta:
|
||||
unique_together = ('content_type', 'slug')
|
||||
indexes = [
|
||||
models.Index(fields=['content_type', 'object_id']),
|
||||
models.Index(fields=['slug']),
|
||||
]
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.content_type} - {self.object_id} - {self.slug}"
|
||||
|
||||
@@ -27,24 +27,16 @@ def photo_upload_path(instance: models.Model, filename: str) -> str:
|
||||
if identifier is None:
|
||||
identifier = obj.pk # Use pk instead of id as it's guaranteed to exist
|
||||
|
||||
# Get the next available number for this object
|
||||
existing_photos = Photo.objects.filter(
|
||||
content_type=photo.content_type,
|
||||
object_id=photo.object_id
|
||||
).count()
|
||||
next_number = existing_photos + 1
|
||||
|
||||
# Create normalized filename
|
||||
ext = os.path.splitext(filename)[1].lower() or '.jpg' # Default to .jpg if no extension
|
||||
new_filename = f"{identifier}_{next_number}{ext}"
|
||||
# Create normalized filename - always use .jpg extension
|
||||
base_filename = f"{identifier}.jpg"
|
||||
|
||||
# If it's a ride photo, store it under the park's directory
|
||||
if content_type == 'ride':
|
||||
ride = cast(Ride, obj)
|
||||
return f"park/{ride.park.slug}/{identifier}/{new_filename}"
|
||||
return f"park/{ride.park.slug}/{identifier}/{base_filename}"
|
||||
|
||||
# For park photos, store directly in park directory
|
||||
return f"park/{identifier}/{new_filename}"
|
||||
return f"park/{identifier}/{base_filename}"
|
||||
|
||||
class Photo(models.Model):
|
||||
"""Generic photo model that can be attached to any model"""
|
||||
|
||||
BIN
media/park/test-park/test-park_1.jpg
Normal file
|
After Width: | Height: | Size: 825 B |
BIN
media/park/test-park/test-park_2.jpg
Normal file
|
After Width: | Height: | Size: 825 B |
BIN
media/park/test-park/test-park_3.jpg
Normal file
|
After Width: | Height: | Size: 825 B |
BIN
media/park/test-park/test-park_4.jpg
Normal file
|
After Width: | Height: | Size: 825 B |
BIN
media/park/test-park/test-park_5.jpg
Normal file
|
After Width: | Height: | Size: 825 B |
BIN
media/park/test-park/test-park_6.jpg
Normal file
|
After Width: | Height: | Size: 825 B |
@@ -1,16 +1,30 @@
|
||||
from django.core.files.storage import FileSystemStorage
|
||||
from django.conf import settings
|
||||
from django.core.files.base import File
|
||||
from django.core.files.move import file_move_safe
|
||||
from django.core.files.uploadedfile import UploadedFile, TemporaryUploadedFile
|
||||
import os
|
||||
import re
|
||||
from typing import Optional, Any, Union
|
||||
|
||||
class MediaStorage(FileSystemStorage):
|
||||
def __init__(self, *args, **kwargs):
|
||||
_instance = None
|
||||
_counters = {}
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
kwargs['location'] = settings.MEDIA_ROOT
|
||||
kwargs['base_url'] = settings.MEDIA_URL
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def get_available_name(self, name, max_length=None):
|
||||
@classmethod
|
||||
def reset_counters(cls):
|
||||
"""Reset all counters - useful for testing"""
|
||||
cls._counters = {}
|
||||
|
||||
def get_available_name(self, name: str, max_length: Optional[int] = None) -> str:
|
||||
"""
|
||||
Returns a filename that's free on the target storage system.
|
||||
Ensures proper normalization and uniqueness.
|
||||
"""
|
||||
# Get the directory and filename
|
||||
directory = os.path.dirname(name)
|
||||
@@ -20,19 +34,49 @@ class MediaStorage(FileSystemStorage):
|
||||
full_dir = os.path.join(self.location, directory)
|
||||
os.makedirs(full_dir, exist_ok=True)
|
||||
|
||||
# Return the name as is since our upload path already handles uniqueness
|
||||
return name
|
||||
# Split filename into root and extension
|
||||
file_root, file_ext = os.path.splitext(filename)
|
||||
|
||||
def _save(self, name, content):
|
||||
# Extract base name without any existing numbers
|
||||
base_root = file_root.rsplit('_', 1)[0]
|
||||
|
||||
# Use counter for this directory
|
||||
dir_key = os.path.join(directory, base_root)
|
||||
if dir_key not in self._counters:
|
||||
self._counters[dir_key] = 0
|
||||
|
||||
self._counters[dir_key] += 1
|
||||
counter = self._counters[dir_key]
|
||||
|
||||
new_name = f"{base_root}_{counter}{file_ext}"
|
||||
return os.path.join(directory, new_name)
|
||||
|
||||
def _save(self, name: str, content: Union[File, UploadedFile]) -> str:
|
||||
"""
|
||||
Save with proper permissions
|
||||
Save the file and set proper permissions
|
||||
"""
|
||||
# Save the file
|
||||
name = super()._save(name, content)
|
||||
# Get the full path where the file will be saved
|
||||
full_path = self.path(name)
|
||||
directory = os.path.dirname(full_path)
|
||||
|
||||
# Create the directory if it doesn't exist
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
|
||||
# Save the file using Django's file handling
|
||||
if isinstance(content, TemporaryUploadedFile):
|
||||
# This is a TemporaryUploadedFile
|
||||
file_move_safe(content.temporary_file_path(), full_path)
|
||||
else:
|
||||
# This is an InMemoryUploadedFile or similar
|
||||
with open(full_path, 'wb') as destination:
|
||||
if hasattr(content, 'chunks'):
|
||||
for chunk in content.chunks():
|
||||
destination.write(chunk)
|
||||
else:
|
||||
destination.write(content.read())
|
||||
|
||||
# Set proper permissions
|
||||
full_path = self.path(name)
|
||||
os.chmod(full_path, 0o644)
|
||||
os.chmod(os.path.dirname(full_path), 0o755)
|
||||
os.chmod(directory, 0o755)
|
||||
|
||||
return name
|
||||
|
||||
BIN
media/submissions/photos/test_OVKudHN.gif
Normal file
|
After Width: | Height: | Size: 35 B |
BIN
media/submissions/photos/test_yp9psr1.gif
Normal file
|
After Width: | Height: | Size: 35 B |
242
media/tests.py
@@ -3,74 +3,192 @@ from django.core.files.uploadedfile import SimpleUploadedFile
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.utils import timezone
|
||||
from django.conf import settings
|
||||
from django.test.utils import override_settings
|
||||
from django.db import models
|
||||
from datetime import datetime
|
||||
from PIL import Image, ExifTags
|
||||
from PIL import Image
|
||||
import piexif
|
||||
import io
|
||||
import shutil
|
||||
import tempfile
|
||||
import os
|
||||
import logging
|
||||
from typing import Optional, Any, Generator, cast
|
||||
from contextlib import contextmanager
|
||||
from .models import Photo
|
||||
from .storage import MediaStorage
|
||||
from parks.models import Park
|
||||
|
||||
User = get_user_model()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@override_settings(MEDIA_ROOT=tempfile.mkdtemp())
|
||||
class PhotoModelTests(TestCase):
|
||||
def setUp(self):
|
||||
# Create a test user
|
||||
self.user = User.objects.create_user(
|
||||
test_media_root: str
|
||||
user: models.Model
|
||||
park: Park
|
||||
content_type: ContentType
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
super().setUpClass()
|
||||
cls.test_media_root = settings.MEDIA_ROOT
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls) -> None:
|
||||
try:
|
||||
shutil.rmtree(cls.test_media_root, ignore_errors=True)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to clean up test media directory: {e}")
|
||||
super().tearDownClass()
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.user = self._create_test_user()
|
||||
self.park = self._create_test_park()
|
||||
self.content_type = ContentType.objects.get_for_model(Park)
|
||||
self._setup_test_directory()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self._cleanup_test_directory()
|
||||
Photo.objects.all().delete()
|
||||
with self._reset_storage_state():
|
||||
pass
|
||||
|
||||
def _create_test_user(self) -> models.Model:
|
||||
"""Create a test user for the tests"""
|
||||
return User.objects.create_user(
|
||||
username='testuser',
|
||||
password='testpass123'
|
||||
)
|
||||
|
||||
# Create a test park for photo association
|
||||
self.park = Park.objects.create(
|
||||
def _create_test_park(self) -> Park:
|
||||
"""Create a test park for the tests"""
|
||||
return Park.objects.create(
|
||||
name='Test Park',
|
||||
slug='test-park'
|
||||
)
|
||||
|
||||
self.content_type = ContentType.objects.get_for_model(Park)
|
||||
def _setup_test_directory(self) -> None:
|
||||
"""Set up test directory and clean any existing test files"""
|
||||
try:
|
||||
# Clean up any existing test park directory
|
||||
test_park_dir = os.path.join(settings.MEDIA_ROOT, 'park', 'test-park')
|
||||
if os.path.exists(test_park_dir):
|
||||
shutil.rmtree(test_park_dir, ignore_errors=True)
|
||||
|
||||
def create_test_image_with_exif(self, date_taken=None):
|
||||
# Create necessary directories
|
||||
os.makedirs(test_park_dir, exist_ok=True)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to set up test directory: {e}")
|
||||
raise
|
||||
|
||||
def _cleanup_test_directory(self) -> None:
|
||||
"""Clean up test directories and files"""
|
||||
try:
|
||||
test_park_dir = os.path.join(settings.MEDIA_ROOT, 'park', 'test-park')
|
||||
if os.path.exists(test_park_dir):
|
||||
shutil.rmtree(test_park_dir, ignore_errors=True)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to clean up test directory: {e}")
|
||||
|
||||
@contextmanager
|
||||
def _reset_storage_state(self) -> Generator[None, None, None]:
|
||||
"""Safely reset storage state"""
|
||||
try:
|
||||
MediaStorage.reset_counters()
|
||||
yield
|
||||
finally:
|
||||
MediaStorage.reset_counters()
|
||||
|
||||
def create_test_image_with_exif(self, date_taken: Optional[datetime] = None, filename: str = 'test.jpg') -> SimpleUploadedFile:
|
||||
"""Helper method to create a test image with EXIF data"""
|
||||
# Create a test image
|
||||
image = Image.new('RGB', (100, 100), color='red')
|
||||
image_io = io.BytesIO()
|
||||
|
||||
# Add EXIF data if date_taken is provided
|
||||
# Save image first without EXIF
|
||||
image.save(image_io, 'JPEG')
|
||||
image_io.seek(0)
|
||||
|
||||
if date_taken:
|
||||
# Create EXIF data
|
||||
exif_dict = {
|
||||
"0th": {},
|
||||
"Exif": {
|
||||
ExifTags.Base.DateTimeOriginal: date_taken.strftime("%Y:%m:%d %H:%M:%S").encode()
|
||||
piexif.ExifIFD.DateTimeOriginal: date_taken.strftime("%Y:%m:%d %H:%M:%S").encode()
|
||||
}
|
||||
}
|
||||
image.save(image_io, 'JPEG', exif=exif_dict)
|
||||
else:
|
||||
image.save(image_io, 'JPEG')
|
||||
exif_bytes = piexif.dump(exif_dict)
|
||||
|
||||
# Insert EXIF into image
|
||||
image_with_exif = io.BytesIO()
|
||||
piexif.insert(exif_bytes, image_io.getvalue(), image_with_exif)
|
||||
image_with_exif.seek(0)
|
||||
image_data = image_with_exif.getvalue()
|
||||
else:
|
||||
image_data = image_io.getvalue()
|
||||
|
||||
image_io.seek(0)
|
||||
return SimpleUploadedFile(
|
||||
'test.jpg',
|
||||
image_io.getvalue(),
|
||||
filename,
|
||||
image_data,
|
||||
content_type='image/jpeg'
|
||||
)
|
||||
|
||||
def test_photo_creation(self):
|
||||
"""Test basic photo creation"""
|
||||
photo = Photo.objects.create(
|
||||
image=SimpleUploadedFile(
|
||||
'test.jpg',
|
||||
b'dummy image data',
|
||||
content_type='image/jpeg'
|
||||
),
|
||||
caption='Test Caption',
|
||||
uploaded_by=self.user,
|
||||
content_type=self.content_type,
|
||||
object_id=self.park.pk
|
||||
)
|
||||
def test_filename_normalization(self) -> None:
|
||||
"""Test that filenames are properly normalized"""
|
||||
with self._reset_storage_state():
|
||||
# Test with various problematic filenames
|
||||
test_cases = [
|
||||
('test with spaces.jpg', 'test-park_1.jpg'),
|
||||
('TEST_UPPER.JPG', 'test-park_2.jpg'),
|
||||
('special@#chars.jpeg', 'test-park_3.jpg'),
|
||||
('no-extension', 'test-park_4.jpg'),
|
||||
('multiple...dots.jpg', 'test-park_5.jpg'),
|
||||
('très_açaí.jpg', 'test-park_6.jpg'), # Unicode characters
|
||||
]
|
||||
|
||||
self.assertEqual(photo.caption, 'Test Caption')
|
||||
self.assertEqual(photo.uploaded_by, self.user)
|
||||
self.assertIsNone(photo.date_taken)
|
||||
for input_name, expected_suffix in test_cases:
|
||||
photo = Photo.objects.create(
|
||||
image=self.create_test_image_with_exif(filename=input_name),
|
||||
uploaded_by=self.user,
|
||||
content_type=self.content_type,
|
||||
object_id=self.park.pk
|
||||
)
|
||||
|
||||
def test_exif_date_extraction(self):
|
||||
# Check that the filename follows the normalized pattern
|
||||
self.assertTrue(
|
||||
photo.image.name.endswith(expected_suffix),
|
||||
f"Expected filename to end with {expected_suffix}, got {photo.image.name}"
|
||||
)
|
||||
|
||||
# Verify the path structure
|
||||
expected_path = f"park/{self.park.slug}/"
|
||||
self.assertTrue(
|
||||
photo.image.name.startswith(expected_path),
|
||||
f"Expected path to start with {expected_path}, got {photo.image.name}"
|
||||
)
|
||||
|
||||
def test_sequential_filename_numbering(self) -> None:
|
||||
"""Test that sequential files get proper numbering"""
|
||||
with self._reset_storage_state():
|
||||
# Create multiple photos and verify numbering
|
||||
for i in range(1, 4):
|
||||
photo = Photo.objects.create(
|
||||
image=self.create_test_image_with_exif(),
|
||||
uploaded_by=self.user,
|
||||
content_type=self.content_type,
|
||||
object_id=self.park.pk
|
||||
)
|
||||
|
||||
expected_name = f"park/{self.park.slug}/test-park_{i}.jpg"
|
||||
self.assertEqual(
|
||||
photo.image.name,
|
||||
expected_name,
|
||||
f"Expected {expected_name}, got {photo.image.name}"
|
||||
)
|
||||
|
||||
def test_exif_date_extraction(self) -> None:
|
||||
"""Test EXIF date extraction from uploaded photos"""
|
||||
test_date = datetime(2024, 1, 1, 12, 0, 0)
|
||||
image_file = self.create_test_image_with_exif(test_date)
|
||||
@@ -90,9 +208,9 @@ class PhotoModelTests(TestCase):
|
||||
else:
|
||||
self.skipTest("EXIF data extraction not supported in test environment")
|
||||
|
||||
def test_photo_without_exif(self):
|
||||
def test_photo_without_exif(self) -> None:
|
||||
"""Test photo upload without EXIF data"""
|
||||
image_file = self.create_test_image_with_exif() # No date provided
|
||||
image_file = self.create_test_image_with_exif()
|
||||
|
||||
photo = Photo.objects.create(
|
||||
image=image_file,
|
||||
@@ -103,31 +221,22 @@ class PhotoModelTests(TestCase):
|
||||
|
||||
self.assertIsNone(photo.date_taken)
|
||||
|
||||
def test_default_caption(self):
|
||||
def test_default_caption(self) -> None:
|
||||
"""Test default caption generation"""
|
||||
photo = Photo.objects.create(
|
||||
image=SimpleUploadedFile(
|
||||
'test.jpg',
|
||||
b'dummy image data',
|
||||
content_type='image/jpeg'
|
||||
),
|
||||
image=self.create_test_image_with_exif(),
|
||||
uploaded_by=self.user,
|
||||
content_type=self.content_type,
|
||||
object_id=self.park.pk
|
||||
)
|
||||
|
||||
expected_prefix = f"Uploaded by {self.user.username} on"
|
||||
expected_prefix = f"Uploaded by {cast(Any, self.user).username} on"
|
||||
self.assertTrue(photo.caption.startswith(expected_prefix))
|
||||
|
||||
def test_primary_photo_toggle(self):
|
||||
def test_primary_photo_toggle(self) -> None:
|
||||
"""Test primary photo functionality"""
|
||||
# Create two photos
|
||||
photo1 = Photo.objects.create(
|
||||
image=SimpleUploadedFile(
|
||||
'test1.jpg',
|
||||
b'dummy image data',
|
||||
content_type='image/jpeg'
|
||||
),
|
||||
image=self.create_test_image_with_exif(),
|
||||
uploaded_by=self.user,
|
||||
content_type=self.content_type,
|
||||
object_id=self.park.pk,
|
||||
@@ -135,51 +244,24 @@ class PhotoModelTests(TestCase):
|
||||
)
|
||||
|
||||
photo2 = Photo.objects.create(
|
||||
image=SimpleUploadedFile(
|
||||
'test2.jpg',
|
||||
b'dummy image data',
|
||||
content_type='image/jpeg'
|
||||
),
|
||||
image=self.create_test_image_with_exif(),
|
||||
uploaded_by=self.user,
|
||||
content_type=self.content_type,
|
||||
object_id=self.park.pk,
|
||||
is_primary=True
|
||||
)
|
||||
|
||||
# Refresh from database
|
||||
photo1.refresh_from_db()
|
||||
photo2.refresh_from_db()
|
||||
|
||||
# Verify only photo2 is primary
|
||||
self.assertFalse(photo1.is_primary)
|
||||
self.assertTrue(photo2.is_primary)
|
||||
|
||||
@override_settings(MEDIA_ROOT='test_media/')
|
||||
def test_photo_upload_path(self):
|
||||
"""Test photo upload path generation"""
|
||||
photo = Photo.objects.create(
|
||||
image=SimpleUploadedFile(
|
||||
'test.jpg',
|
||||
b'dummy image data',
|
||||
content_type='image/jpeg'
|
||||
),
|
||||
uploaded_by=self.user,
|
||||
content_type=self.content_type,
|
||||
object_id=self.park.pk
|
||||
)
|
||||
|
||||
expected_path = f"park/{self.park.slug}/"
|
||||
self.assertTrue(photo.image.name.startswith(expected_path))
|
||||
|
||||
def test_date_taken_field(self):
|
||||
def test_date_taken_field(self) -> None:
|
||||
"""Test date_taken field functionality"""
|
||||
test_date = timezone.now()
|
||||
photo = Photo.objects.create(
|
||||
image=SimpleUploadedFile(
|
||||
'test.jpg',
|
||||
b'dummy image data',
|
||||
content_type='image/jpeg'
|
||||
),
|
||||
image=self.create_test_image_with_exif(),
|
||||
uploaded_by=self.user,
|
||||
content_type=self.content_type,
|
||||
object_id=self.park.pk,
|
||||
|
||||
@@ -1,17 +1,27 @@
|
||||
from typing import Any, Dict, Optional, Type, Union, cast
|
||||
from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.http import JsonResponse, HttpResponseForbidden
|
||||
from django.http import JsonResponse, HttpResponseForbidden, HttpRequest, HttpResponse
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from django.views.generic import DetailView
|
||||
from django.views.generic import DetailView, View
|
||||
from django.utils import timezone
|
||||
from django.db import models
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.base_user import AbstractBaseUser
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
import json
|
||||
from .models import EditSubmission, PhotoSubmission
|
||||
from .models import EditSubmission, PhotoSubmission, UserType
|
||||
|
||||
class EditSubmissionMixin:
|
||||
User = get_user_model()
|
||||
|
||||
class EditSubmissionMixin(DetailView):
|
||||
"""
|
||||
Mixin for handling edit submissions with proper moderation.
|
||||
"""
|
||||
def handle_edit_submission(self, request, changes, reason='', source='', submission_type='EDIT'):
|
||||
model: Optional[Type[models.Model]] = None
|
||||
|
||||
def handle_edit_submission(self, request: HttpRequest, changes: Dict[str, Any], reason: str = '',
|
||||
source: str = '', submission_type: str = 'EDIT') -> JsonResponse:
|
||||
"""
|
||||
Handle an edit submission based on user's role.
|
||||
|
||||
@@ -31,6 +41,9 @@ class EditSubmissionMixin:
|
||||
'message': 'You must be logged in to make edits.'
|
||||
}, status=403)
|
||||
|
||||
if not self.model:
|
||||
raise ValueError("model attribute must be set")
|
||||
|
||||
content_type = ContentType.objects.get_for_model(self.model)
|
||||
|
||||
# Create the submission
|
||||
@@ -46,16 +59,17 @@ class EditSubmissionMixin:
|
||||
# For edits, set the object_id
|
||||
if submission_type == 'EDIT':
|
||||
obj = self.get_object()
|
||||
submission.object_id = obj.id
|
||||
submission.object_id = getattr(obj, 'id', None)
|
||||
|
||||
# Auto-approve for moderators and above
|
||||
if request.user.role in ['MODERATOR', 'ADMIN', 'SUPERUSER']:
|
||||
obj = submission.approve(request.user)
|
||||
user_role = getattr(request.user, 'role', None)
|
||||
if user_role in ['MODERATOR', 'ADMIN', 'SUPERUSER']:
|
||||
obj = submission.approve(cast(UserType, request.user))
|
||||
return JsonResponse({
|
||||
'status': 'success',
|
||||
'message': 'Changes saved successfully.',
|
||||
'auto_approved': True,
|
||||
'redirect_url': obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else None
|
||||
'redirect_url': getattr(obj, 'get_absolute_url', lambda: None)()
|
||||
})
|
||||
|
||||
# Submit for approval for regular users
|
||||
@@ -66,7 +80,7 @@ class EditSubmissionMixin:
|
||||
'auto_approved': False
|
||||
})
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> JsonResponse:
|
||||
"""Handle POST requests for editing"""
|
||||
if not request.user.is_authenticated:
|
||||
return JsonResponse({
|
||||
@@ -87,7 +101,8 @@ class EditSubmissionMixin:
|
||||
'message': 'No changes provided.'
|
||||
}, status=400)
|
||||
|
||||
if not reason and request.user.role == 'USER':
|
||||
user_role = getattr(request.user, 'role', None)
|
||||
if not reason and user_role == 'USER':
|
||||
return JsonResponse({
|
||||
'status': 'error',
|
||||
'message': 'Please provide a reason for your changes.'
|
||||
@@ -108,11 +123,13 @@ class EditSubmissionMixin:
|
||||
'message': str(e)
|
||||
}, status=500)
|
||||
|
||||
class PhotoSubmissionMixin:
|
||||
class PhotoSubmissionMixin(DetailView):
|
||||
"""
|
||||
Mixin for handling photo submissions with proper moderation.
|
||||
"""
|
||||
def handle_photo_submission(self, request):
|
||||
model: Optional[Type[models.Model]] = None
|
||||
|
||||
def handle_photo_submission(self, request: HttpRequest) -> JsonResponse:
|
||||
"""Handle a photo submission based on user's role"""
|
||||
if not request.user.is_authenticated:
|
||||
return JsonResponse({
|
||||
@@ -120,6 +137,9 @@ class PhotoSubmissionMixin:
|
||||
'message': 'You must be logged in to upload photos.'
|
||||
}, status=403)
|
||||
|
||||
if not self.model:
|
||||
raise ValueError("model attribute must be set")
|
||||
|
||||
try:
|
||||
obj = self.get_object()
|
||||
except (AttributeError, self.model.DoesNotExist):
|
||||
@@ -139,14 +159,15 @@ class PhotoSubmissionMixin:
|
||||
submission = PhotoSubmission(
|
||||
user=request.user,
|
||||
content_type=content_type,
|
||||
object_id=obj.id,
|
||||
object_id=getattr(obj, 'id', None),
|
||||
photo=request.FILES['photo'],
|
||||
caption=request.POST.get('caption', ''),
|
||||
date_taken=request.POST.get('date_taken')
|
||||
)
|
||||
|
||||
# Auto-approve for moderators and above
|
||||
if request.user.role in ['MODERATOR', 'ADMIN', 'SUPERUSER']:
|
||||
user_role = getattr(request.user, 'role', None)
|
||||
if user_role in ['MODERATOR', 'ADMIN', 'SUPERUSER']:
|
||||
submission.auto_approve()
|
||||
return JsonResponse({
|
||||
'status': 'success',
|
||||
@@ -164,63 +185,81 @@ class PhotoSubmissionMixin:
|
||||
|
||||
class ModeratorRequiredMixin(UserPassesTestMixin):
|
||||
"""Require moderator or higher role for access"""
|
||||
def test_func(self):
|
||||
request: Optional[HttpRequest] = None
|
||||
|
||||
def test_func(self) -> bool:
|
||||
if not self.request:
|
||||
return False
|
||||
user_role = getattr(self.request.user, 'role', None)
|
||||
return (
|
||||
self.request.user.is_authenticated and
|
||||
self.request.user.role in ['MODERATOR', 'ADMIN', 'SUPERUSER']
|
||||
user_role in ['MODERATOR', 'ADMIN', 'SUPERUSER']
|
||||
)
|
||||
|
||||
def handle_no_permission(self):
|
||||
if not self.request.user.is_authenticated:
|
||||
def handle_no_permission(self) -> HttpResponse:
|
||||
if not self.request or not self.request.user.is_authenticated:
|
||||
return super().handle_no_permission()
|
||||
return HttpResponseForbidden("You must be a moderator to access this page.")
|
||||
|
||||
class AdminRequiredMixin(UserPassesTestMixin):
|
||||
"""Require admin or superuser role for access"""
|
||||
def test_func(self):
|
||||
request: Optional[HttpRequest] = None
|
||||
|
||||
def test_func(self) -> bool:
|
||||
if not self.request:
|
||||
return False
|
||||
user_role = getattr(self.request.user, 'role', None)
|
||||
return (
|
||||
self.request.user.is_authenticated and
|
||||
self.request.user.role in ['ADMIN', 'SUPERUSER']
|
||||
user_role in ['ADMIN', 'SUPERUSER']
|
||||
)
|
||||
|
||||
def handle_no_permission(self):
|
||||
if not self.request.user.is_authenticated:
|
||||
def handle_no_permission(self) -> HttpResponse:
|
||||
if not self.request or not self.request.user.is_authenticated:
|
||||
return super().handle_no_permission()
|
||||
return HttpResponseForbidden("You must be an admin to access this page.")
|
||||
|
||||
class InlineEditMixin:
|
||||
"""Add inline editing context to views"""
|
||||
def get_context_data(self, **kwargs):
|
||||
context = super().get_context_data(**kwargs)
|
||||
if hasattr(self, 'request') and self.request.user.is_authenticated:
|
||||
request: Optional[HttpRequest] = None
|
||||
|
||||
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
|
||||
context = super().get_context_data(**kwargs) # type: ignore
|
||||
if self.request and self.request.user.is_authenticated:
|
||||
context['can_edit'] = True
|
||||
context['can_auto_approve'] = self.request.user.role in ['MODERATOR', 'ADMIN', 'SUPERUSER']
|
||||
user_role = getattr(self.request.user, 'role', None)
|
||||
context['can_auto_approve'] = user_role in ['MODERATOR', 'ADMIN', 'SUPERUSER']
|
||||
|
||||
if isinstance(self, DetailView):
|
||||
obj = self.get_object()
|
||||
obj = self.get_object() # type: ignore
|
||||
context['pending_edits'] = EditSubmission.objects.filter(
|
||||
content_type=ContentType.objects.get_for_model(obj),
|
||||
object_id=obj.id,
|
||||
content_type=ContentType.objects.get_for_model(obj.__class__),
|
||||
object_id=getattr(obj, 'id', None),
|
||||
status='NEW'
|
||||
).select_related('user').order_by('-created_at')
|
||||
return context
|
||||
|
||||
class HistoryMixin:
|
||||
"""Add edit history context to views"""
|
||||
def get_context_data(self, **kwargs):
|
||||
context = super().get_context_data(**kwargs)
|
||||
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
|
||||
context = super().get_context_data(**kwargs) # type: ignore
|
||||
|
||||
# Only add history context for DetailViews
|
||||
if isinstance(self, DetailView):
|
||||
obj = self.get_object()
|
||||
obj = self.get_object() # type: ignore
|
||||
|
||||
# Get historical records ordered by date
|
||||
context['history'] = obj.history.all().select_related('history_user').order_by('-history_date')
|
||||
# Get historical records ordered by date if available
|
||||
history = getattr(obj, 'history', None)
|
||||
if history is not None:
|
||||
context['history'] = history.all().select_related('history_user').order_by('-history_date')
|
||||
else:
|
||||
context['history'] = []
|
||||
|
||||
# Get related edit submissions
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
content_type = ContentType.objects.get_for_model(obj.__class__)
|
||||
context['edit_submissions'] = EditSubmission.objects.filter(
|
||||
content_type=content_type,
|
||||
object_id=obj.id
|
||||
object_id=getattr(obj, 'id', None)
|
||||
).exclude(
|
||||
status='NEW'
|
||||
).select_related('user', 'handled_by').order_by('-created_at')
|
||||
|
||||
@@ -1,9 +1,15 @@
|
||||
from typing import Any, Dict, Optional, Type, Union, cast
|
||||
from django.db import models
|
||||
from django.contrib.contenttypes.fields import GenericForeignKey
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.conf import settings
|
||||
from django.utils import timezone
|
||||
from django.apps import apps
|
||||
from django.core.exceptions import ObjectDoesNotExist, FieldDoesNotExist
|
||||
from django.contrib.auth.base_user import AbstractBaseUser
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
|
||||
UserType = Union[AbstractBaseUser, AnonymousUser]
|
||||
|
||||
class EditSubmission(models.Model):
|
||||
STATUS_CHOICES = [
|
||||
@@ -78,60 +84,76 @@ class EditSubmission(models.Model):
|
||||
models.Index(fields=['status']),
|
||||
]
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
action = "creation" if self.submission_type == 'CREATE' else "edit"
|
||||
target = self.content_object or self.content_type.model_class().__name__
|
||||
model_class = self.content_type.model_class()
|
||||
target = self.content_object or (model_class.__name__ if model_class else 'Unknown')
|
||||
return f"{action} by {self.user.username} on {target}"
|
||||
|
||||
def _resolve_foreign_keys(self, data):
|
||||
def _resolve_foreign_keys(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Convert foreign key IDs to model instances"""
|
||||
model_class = self.content_type.model_class()
|
||||
if not model_class:
|
||||
raise ValueError("Could not resolve model class")
|
||||
|
||||
resolved_data = data.copy()
|
||||
|
||||
for field_name, value in data.items():
|
||||
field = model_class._meta.get_field(field_name)
|
||||
if isinstance(field, models.ForeignKey) and value is not None:
|
||||
related_model = field.related_model
|
||||
resolved_data[field_name] = related_model.objects.get(id=value)
|
||||
try:
|
||||
field = model_class._meta.get_field(field_name)
|
||||
if isinstance(field, models.ForeignKey) and value is not None:
|
||||
related_model = field.related_model
|
||||
if related_model:
|
||||
resolved_data[field_name] = related_model.objects.get(id=value)
|
||||
except (FieldDoesNotExist, ObjectDoesNotExist):
|
||||
continue
|
||||
|
||||
return resolved_data
|
||||
|
||||
def approve(self, user):
|
||||
def approve(self, user: UserType) -> Optional[models.Model]:
|
||||
"""Approve the submission and apply the changes"""
|
||||
self.status = 'APPROVED'
|
||||
self.handled_by = user
|
||||
self.handled_by = user # type: ignore
|
||||
self.handled_at = timezone.now()
|
||||
|
||||
model_class = self.content_type.model_class()
|
||||
resolved_data = self._resolve_foreign_keys(self.changes)
|
||||
if not model_class:
|
||||
raise ValueError("Could not resolve model class")
|
||||
|
||||
if self.submission_type == 'CREATE':
|
||||
# Create new object
|
||||
obj = model_class(**resolved_data)
|
||||
obj.save()
|
||||
# Update object_id after creation
|
||||
self.object_id = obj.id
|
||||
else:
|
||||
# Apply changes to existing object
|
||||
obj = self.content_object
|
||||
for field, value in resolved_data.items():
|
||||
setattr(obj, field, value)
|
||||
obj.save()
|
||||
try:
|
||||
resolved_data = self._resolve_foreign_keys(self.changes)
|
||||
|
||||
self.save()
|
||||
return obj
|
||||
if self.submission_type == 'CREATE':
|
||||
# Create new object
|
||||
obj = model_class(**resolved_data)
|
||||
obj.save()
|
||||
# Update object_id after creation
|
||||
self.object_id = getattr(obj, 'id', None)
|
||||
else:
|
||||
# Apply changes to existing object
|
||||
obj = self.content_object
|
||||
if not obj:
|
||||
raise ValueError("Content object not found")
|
||||
for field, value in resolved_data.items():
|
||||
setattr(obj, field, value)
|
||||
obj.save()
|
||||
|
||||
def reject(self, user):
|
||||
self.save()
|
||||
return obj
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error approving submission: {str(e)}") from e
|
||||
|
||||
def reject(self, user: UserType) -> None:
|
||||
"""Reject the submission"""
|
||||
self.status = 'REJECTED'
|
||||
self.handled_by = user
|
||||
self.handled_by = user # type: ignore
|
||||
self.handled_at = timezone.now()
|
||||
self.save()
|
||||
|
||||
def escalate(self, user):
|
||||
def escalate(self, user: UserType) -> None:
|
||||
"""Escalate the submission to admin"""
|
||||
self.status = 'ESCALATED'
|
||||
self.handled_by = user
|
||||
self.handled_by = user # type: ignore
|
||||
self.handled_at = timezone.now()
|
||||
self.save()
|
||||
|
||||
@@ -189,15 +211,15 @@ class PhotoSubmission(models.Model):
|
||||
models.Index(fields=['status']),
|
||||
]
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return f"Photo submission by {self.user.username} for {self.content_object}"
|
||||
|
||||
def approve(self, moderator, notes=''):
|
||||
def approve(self, moderator: UserType, notes: str = '') -> None:
|
||||
"""Approve the photo submission"""
|
||||
from media.models import Photo
|
||||
|
||||
self.status = 'APPROVED'
|
||||
self.handled_by = moderator
|
||||
self.handled_by = moderator # type: ignore
|
||||
self.handled_at = timezone.now()
|
||||
self.notes = notes
|
||||
|
||||
@@ -213,15 +235,15 @@ class PhotoSubmission(models.Model):
|
||||
|
||||
self.save()
|
||||
|
||||
def reject(self, moderator, notes):
|
||||
def reject(self, moderator: UserType, notes: str) -> None:
|
||||
"""Reject the photo submission"""
|
||||
self.status = 'REJECTED'
|
||||
self.handled_by = moderator
|
||||
self.handled_by = moderator # type: ignore
|
||||
self.handled_at = timezone.now()
|
||||
self.notes = notes
|
||||
self.save()
|
||||
|
||||
def auto_approve(self):
|
||||
def auto_approve(self) -> None:
|
||||
"""Auto-approve the photo submission (for moderators/admins)"""
|
||||
from media.models import Photo
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ from companies.models import Company
|
||||
from django.views.generic import DetailView
|
||||
from django.test import RequestFactory
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
@@ -28,7 +29,7 @@ class TestView(EditSubmissionMixin, PhotoSubmissionMixin, InlineEditMixin, Histo
|
||||
self.object = self.get_object()
|
||||
return super().get_context_data(**kwargs)
|
||||
|
||||
def setup(self, request, *args, **kwargs):
|
||||
def setup(self, request: HttpRequest, *args, **kwargs):
|
||||
super().setup(request, *args, **kwargs)
|
||||
self.request = request
|
||||
|
||||
@@ -224,8 +225,7 @@ class ModerationMixinsTests(TestCase):
|
||||
def test_moderator_required_mixin(self):
|
||||
"""Test moderator required mixin"""
|
||||
class TestModeratorView(ModeratorRequiredMixin):
|
||||
def __init__(self):
|
||||
self.request = None
|
||||
pass
|
||||
|
||||
view = TestModeratorView()
|
||||
|
||||
@@ -253,8 +253,7 @@ class ModerationMixinsTests(TestCase):
|
||||
def test_admin_required_mixin(self):
|
||||
"""Test admin required mixin"""
|
||||
class TestAdminView(AdminRequiredMixin):
|
||||
def __init__(self):
|
||||
self.request = None
|
||||
pass
|
||||
|
||||
view = TestAdminView()
|
||||
|
||||
@@ -319,7 +318,7 @@ class ModerationMixinsTests(TestCase):
|
||||
EditSubmission.objects.create(
|
||||
user=self.user,
|
||||
content_type=ContentType.objects.get_for_model(Company),
|
||||
object_id=self.company.id,
|
||||
object_id=getattr(self.company, 'id', None),
|
||||
submission_type='EDIT',
|
||||
changes={'name': 'New Name'},
|
||||
status='APPROVED'
|
||||
|
||||
@@ -4,6 +4,7 @@ from django.utils.text import slugify
|
||||
from django.contrib.contenttypes.fields import GenericRelation
|
||||
from django.core.exceptions import ValidationError
|
||||
from decimal import Decimal, ROUND_DOWN, InvalidOperation
|
||||
from typing import Tuple, Optional, Any
|
||||
from simple_history.models import HistoricalRecords
|
||||
|
||||
from companies.models import Company
|
||||
@@ -13,6 +14,7 @@ from location.models import Location
|
||||
|
||||
|
||||
class Park(HistoricalModel):
|
||||
id: int # Type hint for Django's automatic id field
|
||||
STATUS_CHOICES = [
|
||||
("OPERATING", "Operating"),
|
||||
("CLOSED_TEMP", "Temporarily Closed"),
|
||||
@@ -57,54 +59,56 @@ class Park(HistoricalModel):
|
||||
# Metadata
|
||||
created_at = models.DateTimeField(auto_now_add=True, null=True)
|
||||
updated_at = models.DateTimeField(auto_now=True)
|
||||
history = HistoricalRecords()
|
||||
|
||||
class Meta:
|
||||
ordering = ["name"]
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
def save(self, *args: Any, **kwargs: Any) -> None:
|
||||
if not self.slug:
|
||||
self.slug = slugify(self.name)
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
def get_absolute_url(self):
|
||||
def get_absolute_url(self) -> str:
|
||||
return reverse("parks:park_detail", kwargs={"slug": self.slug})
|
||||
|
||||
@property
|
||||
def formatted_location(self):
|
||||
def formatted_location(self) -> str:
|
||||
if self.location.exists():
|
||||
location = self.location.first()
|
||||
return location.get_formatted_address()
|
||||
if location:
|
||||
return location.get_formatted_address()
|
||||
return ""
|
||||
|
||||
@property
|
||||
def coordinates(self):
|
||||
def coordinates(self) -> Optional[Tuple[float, float]]:
|
||||
"""Returns coordinates as a tuple (latitude, longitude)"""
|
||||
if self.location.exists():
|
||||
location = self.location.first()
|
||||
return location.coordinates
|
||||
if location:
|
||||
return location.coordinates
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_by_slug(cls, slug):
|
||||
def get_by_slug(cls, slug: str) -> Tuple['Park', bool]:
|
||||
"""Get park by current or historical slug"""
|
||||
try:
|
||||
return cls.objects.get(slug=slug), False
|
||||
except cls.DoesNotExist:
|
||||
# Check historical slugs
|
||||
history = cls.history.filter(slug=slug).order_by("-history_date").first()
|
||||
history = cls.history.filter(slug=slug).order_by("-history_date").first() # type: ignore[attr-defined]
|
||||
if history:
|
||||
try:
|
||||
return cls.objects.get(id=history.id), True
|
||||
except cls.DoesNotExist:
|
||||
pass
|
||||
raise cls.DoesNotExist()
|
||||
return cls.objects.get(pk=history.instance.pk), True
|
||||
except cls.DoesNotExist as e:
|
||||
raise cls.DoesNotExist("No park found with this slug") from e
|
||||
raise cls.DoesNotExist("No park found with this slug")
|
||||
|
||||
|
||||
class ParkArea(HistoricalModel):
|
||||
id: int # Type hint for Django's automatic id field
|
||||
park = models.ForeignKey(Park, on_delete=models.CASCADE, related_name="areas")
|
||||
name = models.CharField(max_length=255)
|
||||
slug = models.SlugField(max_length=255)
|
||||
@@ -115,37 +119,36 @@ class ParkArea(HistoricalModel):
|
||||
# Metadata
|
||||
created_at = models.DateTimeField(auto_now_add=True, null=True)
|
||||
updated_at = models.DateTimeField(auto_now=True)
|
||||
history = HistoricalRecords()
|
||||
|
||||
class Meta:
|
||||
ordering = ["name"]
|
||||
unique_together = ["park", "slug"]
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return f"{self.name} at {self.park.name}"
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
def save(self, *args: Any, **kwargs: Any) -> None:
|
||||
if not self.slug:
|
||||
self.slug = slugify(self.name)
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
def get_absolute_url(self):
|
||||
def get_absolute_url(self) -> str:
|
||||
return reverse(
|
||||
"parks:area_detail",
|
||||
kwargs={"park_slug": self.park.slug, "area_slug": self.slug},
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_by_slug(cls, slug):
|
||||
def get_by_slug(cls, slug: str) -> Tuple['ParkArea', bool]:
|
||||
"""Get area by current or historical slug"""
|
||||
try:
|
||||
return cls.objects.get(slug=slug), False
|
||||
except cls.DoesNotExist:
|
||||
# Check historical slugs
|
||||
history = cls.history.filter(slug=slug).order_by("-history_date").first()
|
||||
history = cls.history.filter(slug=slug).order_by("-history_date").first() # type: ignore[attr-defined]
|
||||
if history:
|
||||
try:
|
||||
return cls.objects.get(id=history.id), True
|
||||
except cls.DoesNotExist:
|
||||
pass
|
||||
raise cls.DoesNotExist()
|
||||
return cls.objects.get(pk=history.instance.pk), True
|
||||
except cls.DoesNotExist as e:
|
||||
raise cls.DoesNotExist("No park area found with this slug") from e
|
||||
raise cls.DoesNotExist("No park area found with this slug")
|
||||
|
||||
122
parks/tests.py
@@ -4,15 +4,32 @@ from django.contrib.auth import get_user_model
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.contrib.gis.geos import Point
|
||||
from django.http import HttpResponse
|
||||
from typing import cast, Optional, Tuple
|
||||
from .models import Park, ParkArea
|
||||
from companies.models import Company
|
||||
from location.models import Location
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
def create_test_location(park: Park) -> Location:
|
||||
"""Helper function to create a test location"""
|
||||
return Location.objects.create(
|
||||
content_type=ContentType.objects.get_for_model(Park),
|
||||
object_id=park.id,
|
||||
name='Test Park Location',
|
||||
location_type='park',
|
||||
street_address='123 Test St',
|
||||
city='Test City',
|
||||
state='TS',
|
||||
country='Test Country',
|
||||
postal_code='12345',
|
||||
point=Point(-118.2437, 34.0522)
|
||||
)
|
||||
|
||||
class ParkModelTests(TestCase):
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
def setUpTestData(cls) -> None:
|
||||
# Create test user
|
||||
cls.user = User.objects.create_user(
|
||||
username='testuser',
|
||||
@@ -35,20 +52,9 @@ class ParkModelTests(TestCase):
|
||||
)
|
||||
|
||||
# Create test location
|
||||
cls.location = Location.objects.create(
|
||||
content_type=ContentType.objects.get_for_model(Park),
|
||||
object_id=cls.park.id,
|
||||
name='Test Park Location',
|
||||
location_type='park',
|
||||
street_address='123 Test St',
|
||||
city='Test City',
|
||||
state='TS',
|
||||
country='Test Country',
|
||||
postal_code='12345',
|
||||
point=Point(-118.2437, 34.0522) # Los Angeles coordinates
|
||||
)
|
||||
cls.location = create_test_location(cls.park)
|
||||
|
||||
def test_park_creation(self):
|
||||
def test_park_creation(self) -> None:
|
||||
"""Test park instance creation and field values"""
|
||||
self.assertEqual(self.park.name, 'Test Park')
|
||||
self.assertEqual(self.park.owner, self.company)
|
||||
@@ -56,34 +62,35 @@ class ParkModelTests(TestCase):
|
||||
self.assertEqual(self.park.website, 'http://testpark.com')
|
||||
self.assertTrue(self.park.slug)
|
||||
|
||||
def test_park_str_representation(self):
|
||||
def test_park_str_representation(self) -> None:
|
||||
"""Test string representation of park"""
|
||||
self.assertEqual(str(self.park), 'Test Park')
|
||||
|
||||
def test_park_location(self):
|
||||
def test_park_location(self) -> None:
|
||||
"""Test park location relationship"""
|
||||
self.assertTrue(self.park.location.exists())
|
||||
location = self.park.location.first()
|
||||
self.assertEqual(location.street_address, '123 Test St')
|
||||
self.assertEqual(location.city, 'Test City')
|
||||
self.assertEqual(location.state, 'TS')
|
||||
self.assertEqual(location.country, 'Test Country')
|
||||
self.assertEqual(location.postal_code, '12345')
|
||||
if location := self.park.location.first():
|
||||
self.assertEqual(location.street_address, '123 Test St')
|
||||
self.assertEqual(location.city, 'Test City')
|
||||
self.assertEqual(location.state, 'TS')
|
||||
self.assertEqual(location.country, 'Test Country')
|
||||
self.assertEqual(location.postal_code, '12345')
|
||||
|
||||
def test_park_coordinates(self):
|
||||
def test_park_coordinates(self) -> None:
|
||||
"""Test park coordinates property"""
|
||||
coords = self.park.coordinates
|
||||
self.assertIsNotNone(coords)
|
||||
self.assertAlmostEqual(coords[0], 34.0522, places=4) # latitude
|
||||
self.assertAlmostEqual(coords[1], -118.2437, places=4) # longitude
|
||||
if coords:
|
||||
self.assertAlmostEqual(coords[0], 34.0522, places=4) # latitude
|
||||
self.assertAlmostEqual(coords[1], -118.2437, places=4) # longitude
|
||||
|
||||
def test_park_formatted_location(self):
|
||||
def test_park_formatted_location(self) -> None:
|
||||
"""Test park formatted_location property"""
|
||||
expected = '123 Test St, Test City, TS, 12345, Test Country'
|
||||
self.assertEqual(self.park.formatted_location, expected)
|
||||
|
||||
class ParkAreaTests(TestCase):
|
||||
def setUp(self):
|
||||
def setUp(self) -> None:
|
||||
# Create test company
|
||||
self.company = Company.objects.create(
|
||||
name='Test Company',
|
||||
@@ -98,18 +105,7 @@ class ParkAreaTests(TestCase):
|
||||
)
|
||||
|
||||
# Create test location
|
||||
self.location = Location.objects.create(
|
||||
content_type=ContentType.objects.get_for_model(Park),
|
||||
object_id=self.park.id,
|
||||
name='Test Park Location',
|
||||
location_type='park',
|
||||
street_address='123 Test St', # Added street_address
|
||||
city='Test City',
|
||||
state='TS',
|
||||
country='Test Country',
|
||||
postal_code='12345',
|
||||
point=Point(-118.2437, 34.0522)
|
||||
)
|
||||
self.location = create_test_location(self.park)
|
||||
|
||||
# Create test area
|
||||
self.area = ParkArea.objects.create(
|
||||
@@ -118,25 +114,25 @@ class ParkAreaTests(TestCase):
|
||||
description='Test Description'
|
||||
)
|
||||
|
||||
def test_area_creation(self):
|
||||
def test_area_creation(self) -> None:
|
||||
"""Test park area creation"""
|
||||
self.assertEqual(self.area.name, 'Test Area')
|
||||
self.assertEqual(self.area.park, self.park)
|
||||
self.assertTrue(self.area.slug)
|
||||
|
||||
def test_area_str_representation(self):
|
||||
def test_area_str_representation(self) -> None:
|
||||
"""Test string representation of park area"""
|
||||
expected = f'Test Area at {self.park.name}'
|
||||
self.assertEqual(str(self.area), expected)
|
||||
|
||||
def test_area_get_by_slug(self):
|
||||
def test_area_get_by_slug(self) -> None:
|
||||
"""Test get_by_slug class method"""
|
||||
area, is_historical = ParkArea.get_by_slug(self.area.slug)
|
||||
self.assertEqual(area, self.area)
|
||||
self.assertFalse(is_historical)
|
||||
|
||||
class ParkViewTests(TestCase):
|
||||
def setUp(self):
|
||||
def setUp(self) -> None:
|
||||
self.client = Client()
|
||||
self.user = User.objects.create_user(
|
||||
username='testuser',
|
||||
@@ -152,43 +148,35 @@ class ParkViewTests(TestCase):
|
||||
owner=self.company,
|
||||
status='OPERATING'
|
||||
)
|
||||
self.location = Location.objects.create(
|
||||
content_type=ContentType.objects.get_for_model(Park),
|
||||
object_id=self.park.id,
|
||||
name='Test Park Location',
|
||||
location_type='park',
|
||||
street_address='123 Test St', # Added street_address
|
||||
city='Test City',
|
||||
state='TS',
|
||||
country='Test Country',
|
||||
postal_code='12345',
|
||||
point=Point(-118.2437, 34.0522)
|
||||
)
|
||||
self.location = create_test_location(self.park)
|
||||
|
||||
def test_park_list_view(self):
|
||||
def test_park_list_view(self) -> None:
|
||||
"""Test park list view"""
|
||||
response = self.client.get(reverse('parks:park_list'))
|
||||
response = cast(HttpResponse, self.client.get(reverse('parks:park_list')))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertContains(response, self.park.name)
|
||||
content = response.content.decode('utf-8')
|
||||
self.assertIn(self.park.name, content)
|
||||
|
||||
def test_park_detail_view(self):
|
||||
def test_park_detail_view(self) -> None:
|
||||
"""Test park detail view"""
|
||||
response = self.client.get(
|
||||
response = cast(HttpResponse, self.client.get(
|
||||
reverse('parks:park_detail', kwargs={'slug': self.park.slug})
|
||||
)
|
||||
))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertContains(response, self.park.name)
|
||||
self.assertContains(response, '123 Test St')
|
||||
content = response.content.decode('utf-8')
|
||||
self.assertIn(self.park.name, content)
|
||||
self.assertIn('123 Test St', content)
|
||||
|
||||
def test_park_area_detail_view(self):
|
||||
def test_park_area_detail_view(self) -> None:
|
||||
"""Test park area detail view"""
|
||||
area = ParkArea.objects.create(
|
||||
park=self.park,
|
||||
name='Test Area'
|
||||
)
|
||||
response = self.client.get(
|
||||
response = cast(HttpResponse, self.client.get(
|
||||
reverse('parks:area_detail',
|
||||
kwargs={'park_slug': self.park.slug, 'area_slug': area.slug})
|
||||
)
|
||||
))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertContains(response, area.name)
|
||||
content = response.content.decode('utf-8')
|
||||
self.assertIn(area.name, content)
|
||||
|
||||
@@ -82,7 +82,6 @@ class Ride(HistoricalModel):
|
||||
updated_at = models.DateTimeField(auto_now=True)
|
||||
photos = GenericRelation('media.Photo')
|
||||
reviews = GenericRelation('reviews.Review')
|
||||
history: HistoricalRecords = HistoricalRecords() # type: ignore
|
||||
|
||||
class Meta:
|
||||
ordering = ['name']
|
||||
@@ -120,7 +119,7 @@ class Ride(HistoricalModel):
|
||||
raise cls.DoesNotExist("No ride found with this slug") from inner_e
|
||||
raise cls.DoesNotExist("No ride found with this slug") from e
|
||||
|
||||
class RollerCoasterStats(models.Model):
|
||||
class RollerCoasterStats(HistoricalModel):
|
||||
LAUNCH_CHOICES = [
|
||||
('CHAIN', 'Chain Lift'),
|
||||
('CABLE', 'Cable Launch'),
|
||||
@@ -212,7 +211,6 @@ class RollerCoasterStats(models.Model):
|
||||
trains_count = models.PositiveIntegerField(null=True, blank=True)
|
||||
cars_per_train = models.PositiveIntegerField(null=True, blank=True)
|
||||
seats_per_car = models.PositiveIntegerField(null=True, blank=True)
|
||||
history: HistoricalRecords = HistoricalRecords() # type: ignore
|
||||
|
||||
class Meta:
|
||||
verbose_name = 'Roller Coaster Statistics'
|
||||
|
||||
@@ -2250,6 +2250,11 @@ select {
|
||||
margin-right: 0.25rem;
|
||||
}
|
||||
|
||||
.mx-2 {
|
||||
margin-left: 0.5rem;
|
||||
margin-right: 0.5rem;
|
||||
}
|
||||
|
||||
.mx-4 {
|
||||
margin-left: 1rem;
|
||||
margin-right: 1rem;
|
||||
@@ -2635,6 +2640,12 @@ select {
|
||||
gap: 1.5rem;
|
||||
}
|
||||
|
||||
.space-x-1 > :not([hidden]) ~ :not([hidden]) {
|
||||
--tw-space-x-reverse: 0;
|
||||
margin-right: calc(0.25rem * var(--tw-space-x-reverse));
|
||||
margin-left: calc(0.25rem * calc(1 - var(--tw-space-x-reverse)));
|
||||
}
|
||||
|
||||
.space-x-2 > :not([hidden]) ~ :not([hidden]) {
|
||||
--tw-space-x-reverse: 0;
|
||||
margin-right: calc(0.5rem * var(--tw-space-x-reverse));
|
||||
@@ -2701,16 +2712,31 @@ select {
|
||||
border-radius: 0.375rem;
|
||||
}
|
||||
|
||||
.rounded-b-lg {
|
||||
border-bottom-right-radius: 0.5rem;
|
||||
border-bottom-left-radius: 0.5rem;
|
||||
}
|
||||
|
||||
.rounded-l-lg {
|
||||
border-top-left-radius: 0.5rem;
|
||||
border-bottom-left-radius: 0.5rem;
|
||||
}
|
||||
|
||||
.rounded-l-md {
|
||||
border-top-left-radius: 0.375rem;
|
||||
border-bottom-left-radius: 0.375rem;
|
||||
}
|
||||
|
||||
.rounded-r-lg {
|
||||
border-top-right-radius: 0.5rem;
|
||||
border-bottom-right-radius: 0.5rem;
|
||||
}
|
||||
|
||||
.rounded-r-md {
|
||||
border-top-right-radius: 0.375rem;
|
||||
border-bottom-right-radius: 0.375rem;
|
||||
}
|
||||
|
||||
.rounded-t-lg {
|
||||
border-top-left-radius: 0.5rem;
|
||||
border-top-right-radius: 0.5rem;
|
||||
@@ -3618,6 +3644,11 @@ select {
|
||||
background-color: rgb(34 197 94 / var(--tw-bg-opacity));
|
||||
}
|
||||
|
||||
.dark\:bg-green-700:is(.dark *) {
|
||||
--tw-bg-opacity: 1;
|
||||
background-color: rgb(21 128 61 / var(--tw-bg-opacity));
|
||||
}
|
||||
|
||||
.dark\:bg-green-900:is(.dark *) {
|
||||
--tw-bg-opacity: 1;
|
||||
background-color: rgb(20 83 45 / var(--tw-bg-opacity));
|
||||
@@ -3722,6 +3753,11 @@ select {
|
||||
color: rgb(74 222 128 / var(--tw-text-opacity));
|
||||
}
|
||||
|
||||
.dark\:text-green-50:is(.dark *) {
|
||||
--tw-text-opacity: 1;
|
||||
color: rgb(240 253 244 / var(--tw-text-opacity));
|
||||
}
|
||||
|
||||
.dark\:text-green-900:is(.dark *) {
|
||||
--tw-text-opacity: 1;
|
||||
color: rgb(20 83 45 / var(--tw-text-opacity));
|
||||
@@ -3884,6 +3920,22 @@ select {
|
||||
grid-template-columns: repeat(2, minmax(0, 1fr));
|
||||
}
|
||||
|
||||
.sm\:grid-cols-3 {
|
||||
grid-template-columns: repeat(3, minmax(0, 1fr));
|
||||
}
|
||||
|
||||
.sm\:flex-row {
|
||||
flex-direction: row;
|
||||
}
|
||||
|
||||
.sm\:items-end {
|
||||
align-items: flex-end;
|
||||
}
|
||||
|
||||
.sm\:items-center {
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
.sm\:space-x-4 > :not([hidden]) ~ :not([hidden]) {
|
||||
--tw-space-x-reverse: 0;
|
||||
margin-right: calc(1rem * var(--tw-space-x-reverse));
|
||||
@@ -3966,6 +4018,12 @@ select {
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
.md\:space-x-3 > :not([hidden]) ~ :not([hidden]) {
|
||||
--tw-space-x-reverse: 0;
|
||||
margin-right: calc(0.75rem * var(--tw-space-x-reverse));
|
||||
margin-left: calc(0.75rem * calc(1 - var(--tw-space-x-reverse)));
|
||||
}
|
||||
|
||||
.md\:text-2xl {
|
||||
font-size: 1.5rem;
|
||||
line-height: 2rem;
|
||||
|
||||