mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-20 08:51:09 -05:00
upgrade to Django 5.1.3 and Python 3.13
This commit is contained in:
Binary file not shown.
@@ -1,5 +1,5 @@
|
||||
from django.views.generic import DetailView, TemplateView
|
||||
from django.contrib.auth import get_user_model, login, authenticate
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.shortcuts import get_object_or_404, redirect, render
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.contrib.auth.mixins import LoginRequiredMixin
|
||||
@@ -15,16 +15,27 @@ from django.utils.crypto import get_random_string
|
||||
from django.utils import timezone
|
||||
from datetime import timedelta
|
||||
from django.contrib.sites.shortcuts import get_current_site
|
||||
from django.db.models import Prefetch
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.db.models import Prefetch, QuerySet
|
||||
from django.http import HttpResponseRedirect, HttpResponse, HttpRequest
|
||||
from django.urls import reverse
|
||||
from accounts.models import User, PasswordReset, TopList, EmailVerification
|
||||
from django.contrib.auth import login
|
||||
from django.core.files.uploadedfile import UploadedFile
|
||||
from accounts.models import User, PasswordReset, TopList, EmailVerification, UserProfile
|
||||
from reviews.models import Review
|
||||
from email_service.services import EmailService
|
||||
from allauth.account.views import LoginView, SignupView
|
||||
from .mixins import TurnstileMixin
|
||||
from typing import Dict, Any, Optional, Union, cast, TYPE_CHECKING
|
||||
from django_htmx.http import HttpResponseClientRefresh
|
||||
from django.contrib.sites.models import Site
|
||||
from django.contrib.sites.requests import RequestSite
|
||||
from contextlib import suppress
|
||||
|
||||
User = get_user_model()
|
||||
if TYPE_CHECKING:
|
||||
from django.contrib.sites.models import Site
|
||||
from django.contrib.sites.requests import RequestSite
|
||||
|
||||
UserModel = get_user_model()
|
||||
|
||||
class CustomLoginView(TurnstileMixin, LoginView):
|
||||
def form_valid(self, form):
|
||||
@@ -33,7 +44,30 @@ class CustomLoginView(TurnstileMixin, LoginView):
|
||||
except ValidationError as e:
|
||||
form.add_error(None, str(e))
|
||||
return self.form_invalid(form)
|
||||
return super().form_valid(form)
|
||||
|
||||
response = super().form_valid(form)
|
||||
|
||||
if getattr(self.request, 'htmx', False):
|
||||
return HttpResponseClientRefresh()
|
||||
return response
|
||||
|
||||
def form_invalid(self, form):
|
||||
if getattr(self.request, 'htmx', False):
|
||||
return render(
|
||||
self.request,
|
||||
'account/partials/login_form.html',
|
||||
self.get_context_data(form=form)
|
||||
)
|
||||
return super().form_invalid(form)
|
||||
|
||||
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
||||
if getattr(request, 'htmx', False):
|
||||
return render(
|
||||
request,
|
||||
'account/partials/login_form.html',
|
||||
self.get_context_data()
|
||||
)
|
||||
return super().get(request, *args, **kwargs)
|
||||
|
||||
class CustomSignupView(TurnstileMixin, SignupView):
|
||||
def form_valid(self, form):
|
||||
@@ -45,29 +79,30 @@ class CustomSignupView(TurnstileMixin, SignupView):
|
||||
return super().form_valid(form)
|
||||
|
||||
@login_required
|
||||
def user_redirect_view(request):
|
||||
"""Redirect /user/ to the logged-in user's profile"""
|
||||
return redirect('profile', username=request.user.username)
|
||||
def user_redirect_view(request: HttpRequest) -> HttpResponse:
|
||||
user = cast(User, request.user)
|
||||
return redirect('profile', username=user.username)
|
||||
|
||||
def email_required(request):
|
||||
"""Handle cases where social auth provider doesn't provide an email"""
|
||||
sociallogin = request.session.get('socialaccount_sociallogin')
|
||||
if not sociallogin:
|
||||
def handle_social_login(request: HttpRequest, email: str) -> HttpResponse:
|
||||
if sociallogin := request.session.get('socialaccount_sociallogin'):
|
||||
sociallogin.user.email = email
|
||||
sociallogin.save()
|
||||
login(request, sociallogin.user)
|
||||
del request.session['socialaccount_sociallogin']
|
||||
messages.success(request, 'Successfully logged in')
|
||||
return redirect('/')
|
||||
return redirect('/')
|
||||
|
||||
def email_required(request: HttpRequest) -> HttpResponse:
|
||||
if not request.session.get('socialaccount_sociallogin'):
|
||||
messages.error(request, 'No social login in progress')
|
||||
return redirect('/')
|
||||
|
||||
if request.method == 'POST':
|
||||
email = request.POST.get('email')
|
||||
if email:
|
||||
sociallogin.user.email = email
|
||||
sociallogin.save()
|
||||
login(request, sociallogin.user)
|
||||
del request.session['socialaccount_sociallogin']
|
||||
messages.success(request, 'Successfully logged in')
|
||||
return redirect('/')
|
||||
else:
|
||||
messages.error(request, 'Email is required')
|
||||
return render(request, 'accounts/email_required.html', {'error': 'Email is required'})
|
||||
if email := request.POST.get('email'):
|
||||
return handle_social_login(request, email)
|
||||
messages.error(request, 'Email is required')
|
||||
return render(request, 'accounts/email_required.html', {'error': 'Email is required'})
|
||||
|
||||
return render(request, 'accounts/email_required.html')
|
||||
|
||||
@@ -78,16 +113,20 @@ class ProfileView(DetailView):
|
||||
slug_field = 'username'
|
||||
slug_url_kwarg = 'username'
|
||||
|
||||
def get_queryset(self):
|
||||
# Optimize the base queryset with select_related
|
||||
def get_queryset(self) -> QuerySet[User]:
|
||||
return User.objects.select_related('profile')
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
|
||||
context = super().get_context_data(**kwargs)
|
||||
user = self.get_object()
|
||||
user = cast(User, self.get_object())
|
||||
|
||||
# Get user's reviews with optimized queries
|
||||
reviews_queryset = Review.objects.filter(
|
||||
context['recent_reviews'] = self._get_user_reviews(user)
|
||||
context['top_lists'] = self._get_user_top_lists(user)
|
||||
|
||||
return context
|
||||
|
||||
def _get_user_reviews(self, user: User) -> QuerySet[Review]:
|
||||
return Review.objects.filter(
|
||||
user=user,
|
||||
is_published=True
|
||||
).select_related(
|
||||
@@ -95,140 +134,178 @@ class ProfileView(DetailView):
|
||||
'user__profile',
|
||||
'content_type'
|
||||
).prefetch_related(
|
||||
'content_object' # This will fetch the related ride/park/etc.
|
||||
'content_object'
|
||||
).order_by('-created_at')[:5]
|
||||
|
||||
def _get_user_top_lists(self, user: User) -> QuerySet[TopList]:
|
||||
return TopList.objects.filter(
|
||||
user=user
|
||||
).select_related(
|
||||
'user',
|
||||
'user__profile'
|
||||
).prefetch_related(
|
||||
'items'
|
||||
).order_by('-created_at')[:5]
|
||||
|
||||
context['recent_reviews'] = reviews_queryset
|
||||
|
||||
# Get user's top lists with optimized queries
|
||||
top_lists_queryset = TopList.objects.filter(user=user).select_related('user', 'user__profile').prefetch_related('items')
|
||||
context['top_lists'] = top_lists_queryset.order_by('-created_at')[:5]
|
||||
|
||||
return context
|
||||
|
||||
class SettingsView(LoginRequiredMixin, TemplateView):
|
||||
template_name = 'accounts/settings.html'
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
|
||||
context = super().get_context_data(**kwargs)
|
||||
context['user'] = self.request.user
|
||||
return context
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
def _handle_profile_update(self, request: HttpRequest) -> None:
|
||||
user = cast(User, request.user)
|
||||
profile = get_object_or_404(UserProfile, user=user)
|
||||
|
||||
if display_name := request.POST.get('display_name'):
|
||||
profile.display_name = display_name
|
||||
|
||||
if 'avatar' in request.FILES:
|
||||
avatar_file = cast(UploadedFile, request.FILES['avatar'])
|
||||
profile.avatar.save(avatar_file.name, avatar_file, save=False)
|
||||
profile.save()
|
||||
|
||||
user.save()
|
||||
messages.success(request, 'Profile updated successfully')
|
||||
|
||||
def _handle_password_change(self, request: HttpRequest) -> Optional[HttpResponseRedirect]:
|
||||
user = cast(User, request.user)
|
||||
old_password = request.POST.get('old_password', '')
|
||||
new_password = request.POST.get('new_password', '')
|
||||
|
||||
if not user.check_password(old_password):
|
||||
messages.error(request, 'Current password is incorrect')
|
||||
return None
|
||||
|
||||
user.set_password(new_password)
|
||||
user.save()
|
||||
messages.success(request, 'Password changed successfully')
|
||||
return HttpResponseRedirect(reverse('account_login'))
|
||||
|
||||
def _handle_email_change(self, request: HttpRequest) -> None:
|
||||
if not (new_email := request.POST.get('new_email')):
|
||||
messages.error(request, 'New email is required')
|
||||
return
|
||||
|
||||
self._send_email_verification(request, new_email)
|
||||
messages.success(request, 'Verification email sent to your new email address')
|
||||
|
||||
def _send_email_verification(self, request: HttpRequest, new_email: str) -> None:
|
||||
user = cast(User, request.user)
|
||||
token = get_random_string(64)
|
||||
EmailVerification.objects.update_or_create(
|
||||
user=user,
|
||||
defaults={'token': token}
|
||||
)
|
||||
|
||||
site = cast(Site, get_current_site(request))
|
||||
verification_url = reverse('verify_email', kwargs={'token': token})
|
||||
|
||||
context = {
|
||||
'user': user,
|
||||
'verification_url': verification_url,
|
||||
'site_name': site.name,
|
||||
}
|
||||
|
||||
email_html = render_to_string('accounts/email/verify_email.html', context)
|
||||
EmailService.send_email(
|
||||
to=new_email,
|
||||
subject='Verify your new email address',
|
||||
text='Click the link to verify your new email address',
|
||||
site=site,
|
||||
html=email_html
|
||||
)
|
||||
|
||||
user.pending_email = new_email
|
||||
user.save()
|
||||
|
||||
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
||||
action = request.POST.get('action')
|
||||
|
||||
if action == 'update_profile':
|
||||
# Handle profile updates
|
||||
user = request.user
|
||||
user.profile.display_name = request.POST.get('display_name', user.profile.display_name)
|
||||
|
||||
if 'avatar' in request.FILES:
|
||||
user.profile.avatar = request.FILES['avatar']
|
||||
user.profile.save()
|
||||
|
||||
user.save()
|
||||
messages.success(request, 'Profile updated successfully')
|
||||
|
||||
self._handle_profile_update(request)
|
||||
elif action == 'change_password':
|
||||
# Handle password change
|
||||
old_password = request.POST.get('old_password')
|
||||
new_password = request.POST.get('new_password')
|
||||
|
||||
if request.user.check_password(old_password):
|
||||
request.user.set_password(new_password)
|
||||
request.user.save()
|
||||
messages.success(request, 'Password changed successfully')
|
||||
return HttpResponseRedirect(reverse('account_login'))
|
||||
else:
|
||||
messages.error(request, 'Current password is incorrect')
|
||||
|
||||
if response := self._handle_password_change(request):
|
||||
return response
|
||||
elif action == 'change_email':
|
||||
# Handle email change with verification
|
||||
new_email = request.POST.get('new_email')
|
||||
if new_email:
|
||||
token = get_random_string(64)
|
||||
EmailVerification.objects.update_or_create(
|
||||
user=request.user,
|
||||
defaults={'token': token}
|
||||
)
|
||||
site = get_current_site(request)
|
||||
verification_url = reverse('verify_email', kwargs={'token': token})
|
||||
context = {
|
||||
'user': request.user,
|
||||
'verification_url': verification_url,
|
||||
'site_name': site.name,
|
||||
}
|
||||
email_html = render_to_string('accounts/email/verify_email.html', context)
|
||||
EmailService.send_email(
|
||||
to=new_email,
|
||||
subject='Verify your new email address',
|
||||
text='Click the link to verify your new email address',
|
||||
site=site,
|
||||
html=email_html
|
||||
)
|
||||
request.user.pending_email = new_email
|
||||
request.user.save()
|
||||
messages.success(request, 'Verification email sent to your new email address')
|
||||
else:
|
||||
messages.error(request, 'New email is required')
|
||||
self._handle_email_change(request)
|
||||
|
||||
return self.get(request, *args, **kwargs)
|
||||
|
||||
def request_password_reset(request):
|
||||
"""Request a password reset email"""
|
||||
if request.method == 'POST':
|
||||
email = request.POST.get('email')
|
||||
if not email:
|
||||
messages.error(request, 'Email is required')
|
||||
return redirect('account_reset_password')
|
||||
|
||||
try:
|
||||
user = User.objects.get(email=email)
|
||||
# Generate token
|
||||
token = get_random_string(64)
|
||||
# Save token with expiry
|
||||
PasswordReset.objects.update_or_create(
|
||||
user=user,
|
||||
defaults={
|
||||
'token': token,
|
||||
'expires_at': timezone.now() + timedelta(hours=24)
|
||||
}
|
||||
)
|
||||
|
||||
# Get current site
|
||||
site = get_current_site(request)
|
||||
|
||||
# Send reset email
|
||||
reset_url = reverse('password_reset_confirm', kwargs={'token': token})
|
||||
context = {
|
||||
'user': user,
|
||||
'reset_url': reset_url,
|
||||
'site_name': site.name,
|
||||
}
|
||||
email_html = render_to_string('accounts/email/password_reset.html', context)
|
||||
|
||||
# Use EmailService instead of send_mail
|
||||
EmailService.send_email(
|
||||
to=email,
|
||||
subject='Reset your password',
|
||||
text='Click the link to reset your password',
|
||||
site=site,
|
||||
html=email_html
|
||||
)
|
||||
|
||||
messages.success(request, 'Password reset email sent')
|
||||
return redirect('account_login')
|
||||
except User.DoesNotExist:
|
||||
# Still show success to prevent email enumeration
|
||||
messages.success(request, 'Password reset email sent')
|
||||
return redirect('account_login')
|
||||
|
||||
return render(request, 'accounts/password_reset.html')
|
||||
def create_password_reset_token(user: User) -> str:
|
||||
token = get_random_string(64)
|
||||
PasswordReset.objects.update_or_create(
|
||||
user=user,
|
||||
defaults={
|
||||
'token': token,
|
||||
'expires_at': timezone.now() + timedelta(hours=24)
|
||||
}
|
||||
)
|
||||
return token
|
||||
|
||||
def reset_password(request, token):
|
||||
"""Reset password using token"""
|
||||
def send_password_reset_email(user: User, site: Union[Site, RequestSite], token: str) -> None:
|
||||
reset_url = reverse('password_reset_confirm', kwargs={'token': token})
|
||||
context = {
|
||||
'user': user,
|
||||
'reset_url': reset_url,
|
||||
'site_name': site.name,
|
||||
}
|
||||
email_html = render_to_string('accounts/email/password_reset.html', context)
|
||||
|
||||
EmailService.send_email(
|
||||
to=user.email,
|
||||
subject='Reset your password',
|
||||
text='Click the link to reset your password',
|
||||
site=site,
|
||||
html=email_html
|
||||
)
|
||||
|
||||
def request_password_reset(request: HttpRequest) -> HttpResponse:
|
||||
if request.method != 'POST':
|
||||
return render(request, 'accounts/password_reset.html')
|
||||
|
||||
if not (email := request.POST.get('email')):
|
||||
messages.error(request, 'Email is required')
|
||||
return redirect('account_reset_password')
|
||||
|
||||
with suppress(User.DoesNotExist):
|
||||
user = User.objects.get(email=email)
|
||||
token = create_password_reset_token(user)
|
||||
site = get_current_site(request)
|
||||
send_password_reset_email(user, site, token)
|
||||
|
||||
messages.success(request, 'Password reset email sent')
|
||||
return redirect('account_login')
|
||||
|
||||
def handle_password_reset(request: HttpRequest, user: User, new_password: str, reset: PasswordReset, site: Union[Site, RequestSite]) -> None:
|
||||
user.set_password(new_password)
|
||||
user.save()
|
||||
|
||||
reset.used = True
|
||||
reset.save()
|
||||
|
||||
send_password_reset_confirmation(user, site)
|
||||
messages.success(request, 'Password reset successfully')
|
||||
|
||||
def send_password_reset_confirmation(user: User, site: Union[Site, RequestSite]) -> None:
|
||||
context = {
|
||||
'user': user,
|
||||
'site_name': site.name,
|
||||
}
|
||||
email_html = render_to_string('accounts/email/password_reset_complete.html', context)
|
||||
|
||||
EmailService.send_email(
|
||||
to=user.email,
|
||||
subject='Password Reset Complete',
|
||||
text='Your password has been reset successfully.',
|
||||
site=site,
|
||||
html=email_html
|
||||
)
|
||||
|
||||
def reset_password(request: HttpRequest, token: str) -> HttpResponse:
|
||||
try:
|
||||
# Get valid reset token
|
||||
reset = PasswordReset.objects.select_related('user').get(
|
||||
token=token,
|
||||
expires_at__gt=timezone.now(),
|
||||
@@ -236,42 +313,15 @@ def reset_password(request, token):
|
||||
)
|
||||
|
||||
if request.method == 'POST':
|
||||
new_password = request.POST.get('new_password')
|
||||
if new_password:
|
||||
# Reset password
|
||||
user = reset.user
|
||||
user.set_password(new_password)
|
||||
user.save()
|
||||
|
||||
# Mark token as used
|
||||
reset.used = True
|
||||
reset.save()
|
||||
|
||||
# Get current site
|
||||
if new_password := request.POST.get('new_password'):
|
||||
site = get_current_site(request)
|
||||
|
||||
# Send confirmation email
|
||||
context = {
|
||||
'user': user,
|
||||
'site_name': site.name,
|
||||
}
|
||||
email_html = render_to_string('accounts/email/password_reset_complete.html', context)
|
||||
|
||||
# Use EmailService instead of send_mail
|
||||
EmailService.send_email(
|
||||
to=user.email,
|
||||
subject='Password Reset Complete',
|
||||
text='Your password has been reset successfully.',
|
||||
site=site,
|
||||
html=email_html
|
||||
)
|
||||
|
||||
messages.success(request, 'Password reset successfully')
|
||||
handle_password_reset(request, reset.user, new_password, reset, site)
|
||||
return redirect('account_login')
|
||||
else:
|
||||
messages.error(request, 'New password is required')
|
||||
|
||||
messages.error(request, 'New password is required')
|
||||
|
||||
return render(request, 'accounts/password_reset_confirm.html', {'token': token})
|
||||
|
||||
except PasswordReset.DoesNotExist:
|
||||
messages.error(request, 'Invalid or expired reset token')
|
||||
return redirect('account_reset_password')
|
||||
|
||||
Reference in New Issue
Block a user