import logging import re from contextlib import suppress from datetime import timedelta from typing import Any, cast from allauth.account.views import LoginView, SignupView from django.contrib import messages from django.contrib.auth import get_user_model, login from django.contrib.auth.decorators import login_required from django.contrib.auth.mixins import LoginRequiredMixin from django.contrib.sites.models import Site from django.contrib.sites.requests import RequestSite from django.contrib.sites.shortcuts import get_current_site from django.core.exceptions import ValidationError from django.core.files.uploadedfile import UploadedFile from django.db.models import QuerySet from django.http import HttpRequest, HttpResponse, HttpResponseRedirect from django.shortcuts import get_object_or_404, redirect, render from django.template.loader import render_to_string from django.urls import reverse from django.utils import timezone from django.utils.crypto import get_random_string from django.views.generic import DetailView, TemplateView from django_forwardemail.services import EmailService from django_htmx.http import HttpResponseClientRefresh from apps.accounts.models import ( EmailVerification, PasswordReset, User, UserProfile, ) from apps.core.logging import log_security_event from apps.lists.models import UserList from apps.parks.models import ParkReview from apps.rides.models import RideReview from .mixins import TurnstileMixin logger = logging.getLogger(__name__) UserModel = get_user_model() class CustomLoginView(TurnstileMixin, LoginView): def form_valid(self, form): try: self.validate_turnstile(self.request) except ValidationError as e: form.add_error(None, str(e)) return self.form_invalid(form) response = super().form_valid(form) user = self.request.user log_security_event( logger, event_type="user_login", message=f"User {user.username} logged in successfully", severity="low", context={"user_id": user.id, "username": user.username}, request=self.request, ) return ( HttpResponseClientRefresh() if getattr(self.request, "htmx", False) else response ) def form_invalid(self, form): log_security_event( logger, event_type="login_failed", message="Failed login attempt", severity="medium", context={"username": form.data.get("login", "unknown")}, request=self.request, ) if getattr(self.request, "htmx", False): return render( self.request, "account/partials/login_form.html", self.get_context_data(form=form), ) return super().form_invalid(form) def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: if getattr(request, "htmx", False): return render( request, "account/partials/login_modal.html", self.get_context_data(), ) return super().get(request, *args, **kwargs) class CustomSignupView(TurnstileMixin, SignupView): def form_valid(self, form): try: self.validate_turnstile(self.request) except ValidationError as e: form.add_error(None, str(e)) return self.form_invalid(form) response = super().form_valid(form) user = self.user log_security_event( logger, event_type="user_signup", message=f"New user registered: {user.username}", severity="low", context={ "user_id": user.id, "username": user.username, "email": user.email, }, request=self.request, ) return ( HttpResponseClientRefresh() if getattr(self.request, "htmx", False) else response ) def form_invalid(self, form): if getattr(self.request, "htmx", False): return render( self.request, "account/partials/signup_modal.html", self.get_context_data(form=form), ) return super().form_invalid(form) def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: if getattr(request, "htmx", False): return render( request, "account/partials/signup_modal.html", self.get_context_data(), ) return super().get(request, *args, **kwargs) @login_required def user_redirect_view(request: HttpRequest) -> HttpResponse: user = cast(User, request.user) return redirect("profile", username=user.username) def handle_social_login(request: HttpRequest, email: str) -> HttpResponse: if sociallogin := request.session.get("socialaccount_sociallogin"): sociallogin.user.email = email sociallogin.save() login(request, sociallogin.user) del request.session["socialaccount_sociallogin"] messages.success(request, "Successfully logged in") return redirect("/") def email_required(request: HttpRequest) -> HttpResponse: if not request.session.get("socialaccount_sociallogin"): messages.error(request, "No social login in progress") return redirect("/") if request.method == "POST": if email := request.POST.get("email"): return handle_social_login(request, email) messages.error(request, "Email is required") return render( request, "accounts/email_required.html", {"error": "Email is required"}, ) return render(request, "accounts/email_required.html") class ProfileView(DetailView): model = User template_name = "accounts/profile.html" context_object_name = "profile_user" slug_field = "username" slug_url_kwarg = "username" def get_queryset(self) -> QuerySet[User]: return User.objects.select_related("profile") def get_context_data(self, **kwargs: Any) -> dict[str, Any]: context = super().get_context_data(**kwargs) user = cast(User, self.get_object()) context["park_reviews"] = self._get_user_park_reviews(user) context["ride_reviews"] = self._get_user_ride_reviews(user) context["top_lists"] = self._get_user_top_lists(user) return context def _get_user_park_reviews(self, user: User) -> QuerySet[ParkReview]: return ( ParkReview.objects.filter(user=user, is_published=True) .select_related("user", "user__profile", "park") .order_by("-created_at")[:5] ) def _get_user_ride_reviews(self, user: User) -> QuerySet[RideReview]: return ( RideReview.objects.filter(user=user, is_published=True) .select_related("user", "user__profile", "ride") .order_by("-created_at")[:5] ) def _get_user_top_lists(self, user: User) -> QuerySet[UserList]: return ( UserList.objects.filter(user=user) .select_related("user", "user__profile") .prefetch_related("items") .order_by("-created_at")[:5] ) class SettingsView(LoginRequiredMixin, TemplateView): template_name = "accounts/settings.html" def get_context_data(self, **kwargs: Any) -> dict[str, Any]: context = super().get_context_data(**kwargs) context["user"] = self.request.user return context def _handle_profile_update(self, request: HttpRequest) -> None: user = cast(User, request.user) profile = get_object_or_404(UserProfile, user=user) if display_name := request.POST.get("display_name"): profile.display_name = display_name if unit_system := request.POST.get("unit_system"): profile.unit_system = unit_system if location := request.POST.get("location"): profile.location = location if "avatar" in request.FILES: avatar_file = cast(UploadedFile, request.FILES["avatar"]) profile.avatar.save(avatar_file.name, avatar_file, save=False) profile.save() user.save() logger.info( f"User {user.username} updated their profile", extra={"user_id": user.id, "username": user.username}, ) messages.success(request, "Profile updated successfully") def _validate_password(self, password: str) -> bool: """Validate password meets requirements.""" return ( len(password) >= 8 and bool(re.search(r"[A-Z]", password)) and bool(re.search(r"[a-z]", password)) and bool(re.search(r"[0-9]", password)) ) def _send_password_change_confirmation( self, request: HttpRequest, user: User ) -> None: """Send password change confirmation email.""" site = get_current_site(request) context = { "user": user, "site_name": site.name, } email_html = render_to_string( "accounts/email/password_change_confirmation.html", context ) EmailService.send_email( to=user.email, subject="Password Changed Successfully", text="Your password has been changed successfully.", site=site, html=email_html, ) def _handle_password_change( self, request: HttpRequest ) -> HttpResponseRedirect | None: user = cast(User, request.user) old_password = request.POST.get("old_password", "") new_password = request.POST.get("new_password", "") confirm_password = request.POST.get("confirm_password", "") if not user.check_password(old_password): messages.error(request, "Current password is incorrect") return None if new_password != confirm_password: messages.error(request, "New passwords do not match") return None if not self._validate_password(new_password): messages.error( request, "Password must be at least 8 characters and contain uppercase, lowercase, and numbers", ) return None user.set_password(new_password) user.save() log_security_event( logger, event_type="password_changed", message=f"User {user.username} changed their password", severity="medium", context={"user_id": user.id, "username": user.username}, request=request, ) self._send_password_change_confirmation(request, user) messages.success( request, "Password changed successfully. Please check your email for confirmation.", ) return HttpResponseRedirect(reverse("account_login")) def _handle_email_change(self, request: HttpRequest) -> None: if new_email := request.POST.get("new_email"): self._send_email_verification(request, new_email) messages.success( request, "Verification email sent to your new email address" ) else: messages.error(request, "New email is required") def _send_email_verification(self, request: HttpRequest, new_email: str) -> None: user = cast(User, request.user) token = get_random_string(64) EmailVerification.objects.update_or_create(user=user, defaults={"token": token}) site = cast(Site, get_current_site(request)) verification_url = reverse("verify_email", kwargs={"token": token}) context = { "user": user, "verification_url": verification_url, "site_name": site.name, } email_html = render_to_string("accounts/email/verify_email.html", context) EmailService.send_email( to=new_email, subject="Verify your new email address", text="Click the link to verify your new email address", site=site, html=email_html, ) user.pending_email = new_email user.save() def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: action = request.POST.get("action") if action == "update_profile": self._handle_profile_update(request) elif action == "change_password": if response := self._handle_password_change(request): return response elif action == "change_email": self._handle_email_change(request) return self.get(request, *args, **kwargs) def create_password_reset_token(user: User) -> str: token = get_random_string(64) PasswordReset.objects.update_or_create( user=user, defaults={ "token": token, "expires_at": timezone.now() + timedelta(hours=24), }, ) return token def send_password_reset_email( user: User, site: Site | RequestSite, token: str ) -> None: reset_url = reverse("password_reset_confirm", kwargs={"token": token}) context = { "user": user, "reset_url": reset_url, "site_name": site.name, } email_html = render_to_string("accounts/email/password_reset.html", context) EmailService.send_email( to=user.email, subject="Reset your password", text="Click the link to reset your password", site=site, html=email_html, ) def request_password_reset(request: HttpRequest) -> HttpResponse: if request.method != "POST": return render(request, "accounts/password_reset.html") if not (email := request.POST.get("email")): messages.error(request, "Email is required") return redirect("account_reset_password") with suppress(User.DoesNotExist): user = User.objects.get(email=email) token = create_password_reset_token(user) site = get_current_site(request) send_password_reset_email(user, site, token) log_security_event( logger, event_type="password_reset_requested", message=f"Password reset requested for {email}", severity="medium", context={"email": email}, request=request, ) messages.success(request, "Password reset email sent") return redirect("account_login") def handle_password_reset( request: HttpRequest, user: User, new_password: str, reset: PasswordReset, site: Site | RequestSite, ) -> None: user.set_password(new_password) user.save() reset.used = True reset.save() log_security_event( logger, event_type="password_reset_complete", message=f"Password reset completed for user {user.username}", severity="medium", context={"user_id": user.id, "username": user.username}, request=request, ) send_password_reset_confirmation(user, site) messages.success(request, "Password reset successfully") def send_password_reset_confirmation( user: User, site: Site | RequestSite ) -> None: context = { "user": user, "site_name": site.name, } email_html = render_to_string( "accounts/email/password_reset_complete.html", context ) EmailService.send_email( to=user.email, subject="Password Reset Complete", text="Your password has been reset successfully.", site=site, html=email_html, ) def reset_password(request: HttpRequest, token: str) -> HttpResponse: try: reset = PasswordReset.objects.select_related("user").get( token=token, expires_at__gt=timezone.now(), used=False ) if request.method == "POST": if new_password := request.POST.get("new_password"): site = get_current_site(request) handle_password_reset(request, reset.user, new_password, reset, site) return redirect("account_login") messages.error(request, "New password is required") return render(request, "accounts/password_reset_confirm.html", {"token": token}) except PasswordReset.DoesNotExist: messages.error(request, "Invalid or expired reset token") return redirect("account_reset_password")