from django.views.generic import DetailView, TemplateView from django.contrib.auth import get_user_model from django.shortcuts import get_object_or_404, redirect, render from django.contrib.auth.decorators import login_required from django.contrib.auth.mixins import LoginRequiredMixin from django.contrib import messages from django.core.exceptions import ValidationError from allauth.socialaccount.providers.google.views import GoogleOAuth2Adapter from allauth.socialaccount.providers.discord.views import DiscordOAuth2Adapter from allauth.socialaccount.providers.oauth2.client import OAuth2Client from django.conf import settings from django.core.mail import send_mail from django.template.loader import render_to_string from django.utils.crypto import get_random_string from django.utils import timezone from datetime import timedelta from django.contrib.sites.shortcuts import get_current_site from django.db.models import Prefetch, QuerySet from django.http import HttpResponseRedirect, HttpResponse, HttpRequest from django.urls import reverse from django.contrib.auth import login from django.core.files.uploadedfile import UploadedFile from accounts.models import User, PasswordReset, TopList, EmailVerification, UserProfile from email_service.services import EmailService from parks.models import ParkReview from rides.models import RideReview from allauth.account.views import LoginView, SignupView from .mixins import TurnstileMixin from typing import Dict, Any, Optional, Union, cast, TYPE_CHECKING from django_htmx.http import HttpResponseClientRefresh from django.contrib.sites.models import Site from django.contrib.sites.requests import RequestSite from contextlib import suppress import re if TYPE_CHECKING: from django.contrib.sites.models import Site from django.contrib.sites.requests import RequestSite UserModel = get_user_model() class CustomLoginView(TurnstileMixin, LoginView): def form_valid(self, form): try: self.validate_turnstile(self.request) except ValidationError as e: form.add_error(None, str(e)) return self.form_invalid(form) response = super().form_valid(form) return HttpResponseClientRefresh() if getattr(self.request, 'htmx', False) else response def form_invalid(self, form): if getattr(self.request, 'htmx', False): return render( self.request, 'account/partials/login_form.html', self.get_context_data(form=form) ) return super().form_invalid(form) def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: if getattr(request, 'htmx', False): return render( request, 'account/partials/login_modal.html', self.get_context_data() ) return super().get(request, *args, **kwargs) class CustomSignupView(TurnstileMixin, SignupView): def form_valid(self, form): try: self.validate_turnstile(self.request) except ValidationError as e: form.add_error(None, str(e)) return self.form_invalid(form) response = super().form_valid(form) return HttpResponseClientRefresh() if getattr(self.request, 'htmx', False) else response def form_invalid(self, form): if getattr(self.request, 'htmx', False): return render( self.request, 'account/partials/signup_modal.html', self.get_context_data(form=form) ) return super().form_invalid(form) def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: if getattr(request, 'htmx', False): return render( request, 'account/partials/signup_modal.html', self.get_context_data() ) return super().get(request, *args, **kwargs) @login_required def user_redirect_view(request: HttpRequest) -> HttpResponse: user = cast(User, request.user) return redirect('profile', username=user.username) def handle_social_login(request: HttpRequest, email: str) -> HttpResponse: if sociallogin := request.session.get('socialaccount_sociallogin'): sociallogin.user.email = email sociallogin.save() login(request, sociallogin.user) del request.session['socialaccount_sociallogin'] messages.success(request, 'Successfully logged in') return redirect('/') def email_required(request: HttpRequest) -> HttpResponse: if not request.session.get('socialaccount_sociallogin'): messages.error(request, 'No social login in progress') return redirect('/') if request.method == 'POST': if email := request.POST.get('email'): return handle_social_login(request, email) messages.error(request, 'Email is required') return render(request, 'accounts/email_required.html', {'error': 'Email is required'}) return render(request, 'accounts/email_required.html') class ProfileView(DetailView): model = User template_name = 'accounts/profile.html' context_object_name = 'profile_user' slug_field = 'username' slug_url_kwarg = 'username' def get_queryset(self) -> QuerySet[User]: return User.objects.select_related('profile') def get_context_data(self, **kwargs: Any) -> Dict[str, Any]: context = super().get_context_data(**kwargs) user = cast(User, self.get_object()) context['park_reviews'] = self._get_user_park_reviews(user) context['ride_reviews'] = self._get_user_ride_reviews(user) context['top_lists'] = self._get_user_top_lists(user) return context def _get_user_park_reviews(self, user: User) -> QuerySet[ParkReview]: return ParkReview.objects.filter( user=user, is_published=True ).select_related( 'user', 'user__profile', 'park' ).order_by('-created_at')[:5] def _get_user_ride_reviews(self, user: User) -> QuerySet[RideReview]: return RideReview.objects.filter( user=user, is_published=True ).select_related( 'user', 'user__profile', 'ride' ).order_by('-created_at')[:5] def _get_user_top_lists(self, user: User) -> QuerySet[TopList]: return TopList.objects.filter( user=user ).select_related( 'user', 'user__profile' ).prefetch_related( 'items' ).order_by('-created_at')[:5] class SettingsView(LoginRequiredMixin, TemplateView): template_name = 'accounts/settings.html' def get_context_data(self, **kwargs: Any) -> Dict[str, Any]: context = super().get_context_data(**kwargs) context['user'] = self.request.user return context def _handle_profile_update(self, request: HttpRequest) -> None: user = cast(User, request.user) profile = get_object_or_404(UserProfile, user=user) if display_name := request.POST.get('display_name'): profile.display_name = display_name if 'avatar' in request.FILES: avatar_file = cast(UploadedFile, request.FILES['avatar']) profile.avatar.save(avatar_file.name, avatar_file, save=False) profile.save() user.save() messages.success(request, 'Profile updated successfully') def _validate_password(self, password: str) -> bool: """Validate password meets requirements.""" return ( len(password) >= 8 and bool(re.search(r'[A-Z]', password)) and bool(re.search(r'[a-z]', password)) and bool(re.search(r'[0-9]', password)) ) def _send_password_change_confirmation(self, request: HttpRequest, user: User) -> None: """Send password change confirmation email.""" site = get_current_site(request) context = { 'user': user, 'site_name': site.name, } email_html = render_to_string('accounts/email/password_change_confirmation.html', context) EmailService.send_email( to=user.email, subject='Password Changed Successfully', text='Your password has been changed successfully.', site=site, html=email_html ) def _handle_password_change(self, request: HttpRequest) -> Optional[HttpResponseRedirect]: user = cast(User, request.user) old_password = request.POST.get('old_password', '') new_password = request.POST.get('new_password', '') confirm_password = request.POST.get('confirm_password', '') if not user.check_password(old_password): messages.error(request, 'Current password is incorrect') return None if new_password != confirm_password: messages.error(request, 'New passwords do not match') return None if not self._validate_password(new_password): messages.error(request, 'Password must be at least 8 characters and contain uppercase, lowercase, and numbers') return None user.set_password(new_password) user.save() self._send_password_change_confirmation(request, user) messages.success(request, 'Password changed successfully. Please check your email for confirmation.') return HttpResponseRedirect(reverse('account_login')) def _handle_email_change(self, request: HttpRequest) -> None: if new_email := request.POST.get('new_email'): self._send_email_verification(request, new_email) messages.success(request, 'Verification email sent to your new email address') else: messages.error(request, 'New email is required') def _send_email_verification(self, request: HttpRequest, new_email: str) -> None: user = cast(User, request.user) token = get_random_string(64) EmailVerification.objects.update_or_create( user=user, defaults={'token': token} ) site = cast(Site, get_current_site(request)) verification_url = reverse('verify_email', kwargs={'token': token}) context = { 'user': user, 'verification_url': verification_url, 'site_name': site.name, } email_html = render_to_string('accounts/email/verify_email.html', context) EmailService.send_email( to=new_email, subject='Verify your new email address', text='Click the link to verify your new email address', site=site, html=email_html ) user.pending_email = new_email user.save() def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: action = request.POST.get('action') if action == 'update_profile': self._handle_profile_update(request) elif action == 'change_password': if response := self._handle_password_change(request): return response elif action == 'change_email': self._handle_email_change(request) return self.get(request, *args, **kwargs) def create_password_reset_token(user: User) -> str: token = get_random_string(64) PasswordReset.objects.update_or_create( user=user, defaults={ 'token': token, 'expires_at': timezone.now() + timedelta(hours=24) } ) return token def send_password_reset_email(user: User, site: Union[Site, RequestSite], token: str) -> None: reset_url = reverse('password_reset_confirm', kwargs={'token': token}) context = { 'user': user, 'reset_url': reset_url, 'site_name': site.name, } email_html = render_to_string('accounts/email/password_reset.html', context) EmailService.send_email( to=user.email, subject='Reset your password', text='Click the link to reset your password', site=site, html=email_html ) def request_password_reset(request: HttpRequest) -> HttpResponse: if request.method != 'POST': return render(request, 'accounts/password_reset.html') if not (email := request.POST.get('email')): messages.error(request, 'Email is required') return redirect('account_reset_password') with suppress(User.DoesNotExist): user = User.objects.get(email=email) token = create_password_reset_token(user) site = get_current_site(request) send_password_reset_email(user, site, token) messages.success(request, 'Password reset email sent') return redirect('account_login') def handle_password_reset(request: HttpRequest, user: User, new_password: str, reset: PasswordReset, site: Union[Site, RequestSite]) -> None: user.set_password(new_password) user.save() reset.used = True reset.save() send_password_reset_confirmation(user, site) messages.success(request, 'Password reset successfully') def send_password_reset_confirmation(user: User, site: Union[Site, RequestSite]) -> None: context = { 'user': user, 'site_name': site.name, } email_html = render_to_string('accounts/email/password_reset_complete.html', context) EmailService.send_email( to=user.email, subject='Password Reset Complete', text='Your password has been reset successfully.', site=site, html=email_html ) def reset_password(request: HttpRequest, token: str) -> HttpResponse: try: reset = PasswordReset.objects.select_related('user').get( token=token, expires_at__gt=timezone.now(), used=False ) if request.method == 'POST': if new_password := request.POST.get('new_password'): site = get_current_site(request) handle_password_reset(request, reset.user, new_password, reset, site) return redirect('account_login') messages.error(request, 'New password is required') return render(request, 'accounts/password_reset_confirm.html', {'token': token}) except PasswordReset.DoesNotExist: messages.error(request, 'Invalid or expired reset token') return redirect('account_reset_password')