mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-20 15:51:08 -05:00
427 lines
14 KiB
Python
427 lines
14 KiB
Python
from django.views.generic import DetailView, TemplateView
|
|
from django.contrib.auth import get_user_model
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.contrib.auth.mixins import LoginRequiredMixin
|
|
from django.contrib import messages
|
|
from django.core.exceptions import ValidationError
|
|
from django.template.loader import render_to_string
|
|
from django.utils.crypto import get_random_string
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from django.contrib.sites.models import Site
|
|
from django.contrib.sites.requests import RequestSite
|
|
from django.db.models import QuerySet
|
|
from django.http import HttpResponseRedirect, HttpResponse, HttpRequest
|
|
from django.urls import reverse
|
|
from django.contrib.auth import login
|
|
from django.core.files.uploadedfile import UploadedFile
|
|
from apps.accounts.models import (
|
|
User,
|
|
PasswordReset,
|
|
TopList,
|
|
EmailVerification,
|
|
UserProfile,
|
|
)
|
|
from django_forwardemail.services import EmailService
|
|
from apps.parks.models import ParkReview
|
|
from apps.rides.models import RideReview
|
|
from allauth.account.views import LoginView, SignupView
|
|
from .mixins import TurnstileMixin
|
|
from typing import Dict, Any, Optional, Union, cast
|
|
from django_htmx.http import HttpResponseClientRefresh
|
|
from contextlib import suppress
|
|
import re
|
|
|
|
UserModel = get_user_model()
|
|
|
|
|
|
class CustomLoginView(TurnstileMixin, LoginView):
|
|
def form_valid(self, form):
|
|
try:
|
|
self.validate_turnstile(self.request)
|
|
except ValidationError as e:
|
|
form.add_error(None, str(e))
|
|
return self.form_invalid(form)
|
|
|
|
response = super().form_valid(form)
|
|
return (
|
|
HttpResponseClientRefresh()
|
|
if getattr(self.request, "htmx", False)
|
|
else response
|
|
)
|
|
|
|
def form_invalid(self, form):
|
|
if getattr(self.request, "htmx", False):
|
|
return render(
|
|
self.request,
|
|
"account/partials/login_form.html",
|
|
self.get_context_data(form=form),
|
|
)
|
|
return super().form_invalid(form)
|
|
|
|
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
if getattr(request, "htmx", False):
|
|
return render(
|
|
request,
|
|
"account/partials/login_modal.html",
|
|
self.get_context_data(),
|
|
)
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
|
|
class CustomSignupView(TurnstileMixin, SignupView):
|
|
def form_valid(self, form):
|
|
try:
|
|
self.validate_turnstile(self.request)
|
|
except ValidationError as e:
|
|
form.add_error(None, str(e))
|
|
return self.form_invalid(form)
|
|
|
|
response = super().form_valid(form)
|
|
return (
|
|
HttpResponseClientRefresh()
|
|
if getattr(self.request, "htmx", False)
|
|
else response
|
|
)
|
|
|
|
def form_invalid(self, form):
|
|
if getattr(self.request, "htmx", False):
|
|
return render(
|
|
self.request,
|
|
"account/partials/signup_modal.html",
|
|
self.get_context_data(form=form),
|
|
)
|
|
return super().form_invalid(form)
|
|
|
|
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
if getattr(request, "htmx", False):
|
|
return render(
|
|
request,
|
|
"account/partials/signup_modal.html",
|
|
self.get_context_data(),
|
|
)
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
|
|
@login_required
|
|
def user_redirect_view(request: HttpRequest) -> HttpResponse:
|
|
user = cast(User, request.user)
|
|
return redirect("profile", username=user.username)
|
|
|
|
|
|
def handle_social_login(request: HttpRequest, email: str) -> HttpResponse:
|
|
if sociallogin := request.session.get("socialaccount_sociallogin"):
|
|
sociallogin.user.email = email
|
|
sociallogin.save()
|
|
login(request, sociallogin.user)
|
|
del request.session["socialaccount_sociallogin"]
|
|
messages.success(request, "Successfully logged in")
|
|
return redirect("/")
|
|
|
|
|
|
def email_required(request: HttpRequest) -> HttpResponse:
|
|
if not request.session.get("socialaccount_sociallogin"):
|
|
messages.error(request, "No social login in progress")
|
|
return redirect("/")
|
|
|
|
if request.method == "POST":
|
|
if email := request.POST.get("email"):
|
|
return handle_social_login(request, email)
|
|
messages.error(request, "Email is required")
|
|
return render(
|
|
request,
|
|
"accounts/email_required.html",
|
|
{"error": "Email is required"},
|
|
)
|
|
|
|
return render(request, "accounts/email_required.html")
|
|
|
|
|
|
class ProfileView(DetailView):
|
|
model = User
|
|
template_name = "accounts/profile.html"
|
|
context_object_name = "profile_user"
|
|
slug_field = "username"
|
|
slug_url_kwarg = "username"
|
|
|
|
def get_queryset(self) -> QuerySet[User]:
|
|
return User.objects.select_related("profile")
|
|
|
|
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
user = cast(User, self.get_object())
|
|
|
|
context["park_reviews"] = self._get_user_park_reviews(user)
|
|
context["ride_reviews"] = self._get_user_ride_reviews(user)
|
|
context["top_lists"] = self._get_user_top_lists(user)
|
|
|
|
return context
|
|
|
|
def _get_user_park_reviews(self, user: User) -> QuerySet[ParkReview]:
|
|
return (
|
|
ParkReview.objects.filter(user=user, is_published=True)
|
|
.select_related("user", "user__profile", "park")
|
|
.order_by("-created_at")[:5]
|
|
)
|
|
|
|
def _get_user_ride_reviews(self, user: User) -> QuerySet[RideReview]:
|
|
return (
|
|
RideReview.objects.filter(user=user, is_published=True)
|
|
.select_related("user", "user__profile", "ride")
|
|
.order_by("-created_at")[:5]
|
|
)
|
|
|
|
def _get_user_top_lists(self, user: User) -> QuerySet[TopList]:
|
|
return (
|
|
TopList.objects.filter(user=user)
|
|
.select_related("user", "user__profile")
|
|
.prefetch_related("items")
|
|
.order_by("-created_at")[:5]
|
|
)
|
|
|
|
|
|
class SettingsView(LoginRequiredMixin, TemplateView):
|
|
template_name = "accounts/settings.html"
|
|
|
|
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
context["user"] = self.request.user
|
|
return context
|
|
|
|
def _handle_profile_update(self, request: HttpRequest) -> None:
|
|
user = cast(User, request.user)
|
|
profile = get_object_or_404(UserProfile, user=user)
|
|
|
|
if display_name := request.POST.get("display_name"):
|
|
profile.display_name = display_name
|
|
|
|
if "avatar" in request.FILES:
|
|
avatar_file = cast(UploadedFile, request.FILES["avatar"])
|
|
profile.avatar.save(avatar_file.name, avatar_file, save=False)
|
|
profile.save()
|
|
|
|
user.save()
|
|
messages.success(request, "Profile updated successfully")
|
|
|
|
def _validate_password(self, password: str) -> bool:
|
|
"""Validate password meets requirements."""
|
|
return (
|
|
len(password) >= 8
|
|
and bool(re.search(r"[A-Z]", password))
|
|
and bool(re.search(r"[a-z]", password))
|
|
and bool(re.search(r"[0-9]", password))
|
|
)
|
|
|
|
def _send_password_change_confirmation(
|
|
self, request: HttpRequest, user: User
|
|
) -> None:
|
|
"""Send password change confirmation email."""
|
|
site = get_current_site(request)
|
|
context = {
|
|
"user": user,
|
|
"site_name": site.name,
|
|
}
|
|
|
|
email_html = render_to_string(
|
|
"accounts/email/password_change_confirmation.html", context
|
|
)
|
|
|
|
EmailService.send_email(
|
|
to=user.email,
|
|
subject="Password Changed Successfully",
|
|
text="Your password has been changed successfully.",
|
|
site=site,
|
|
html=email_html,
|
|
)
|
|
|
|
def _handle_password_change(
|
|
self, request: HttpRequest
|
|
) -> Optional[HttpResponseRedirect]:
|
|
user = cast(User, request.user)
|
|
old_password = request.POST.get("old_password", "")
|
|
new_password = request.POST.get("new_password", "")
|
|
confirm_password = request.POST.get("confirm_password", "")
|
|
|
|
if not user.check_password(old_password):
|
|
messages.error(request, "Current password is incorrect")
|
|
return None
|
|
|
|
if new_password != confirm_password:
|
|
messages.error(request, "New passwords do not match")
|
|
return None
|
|
|
|
if not self._validate_password(new_password):
|
|
messages.error(
|
|
request,
|
|
"Password must be at least 8 characters and contain uppercase, lowercase, and numbers",
|
|
)
|
|
return None
|
|
|
|
user.set_password(new_password)
|
|
user.save()
|
|
|
|
self._send_password_change_confirmation(request, user)
|
|
messages.success(
|
|
request,
|
|
"Password changed successfully. Please check your email for confirmation.",
|
|
)
|
|
return HttpResponseRedirect(reverse("account_login"))
|
|
|
|
def _handle_email_change(self, request: HttpRequest) -> None:
|
|
if new_email := request.POST.get("new_email"):
|
|
self._send_email_verification(request, new_email)
|
|
messages.success(
|
|
request, "Verification email sent to your new email address"
|
|
)
|
|
else:
|
|
messages.error(request, "New email is required")
|
|
|
|
def _send_email_verification(self, request: HttpRequest, new_email: str) -> None:
|
|
user = cast(User, request.user)
|
|
token = get_random_string(64)
|
|
EmailVerification.objects.update_or_create(user=user, defaults={"token": token})
|
|
|
|
site = cast(Site, get_current_site(request))
|
|
verification_url = reverse("verify_email", kwargs={"token": token})
|
|
|
|
context = {
|
|
"user": user,
|
|
"verification_url": verification_url,
|
|
"site_name": site.name,
|
|
}
|
|
|
|
email_html = render_to_string("accounts/email/verify_email.html", context)
|
|
EmailService.send_email(
|
|
to=new_email,
|
|
subject="Verify your new email address",
|
|
text="Click the link to verify your new email address",
|
|
site=site,
|
|
html=email_html,
|
|
)
|
|
|
|
user.pending_email = new_email
|
|
user.save()
|
|
|
|
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
action = request.POST.get("action")
|
|
|
|
if action == "update_profile":
|
|
self._handle_profile_update(request)
|
|
elif action == "change_password":
|
|
if response := self._handle_password_change(request):
|
|
return response
|
|
elif action == "change_email":
|
|
self._handle_email_change(request)
|
|
|
|
return self.get(request, *args, **kwargs)
|
|
|
|
|
|
def create_password_reset_token(user: User) -> str:
|
|
token = get_random_string(64)
|
|
PasswordReset.objects.update_or_create(
|
|
user=user,
|
|
defaults={
|
|
"token": token,
|
|
"expires_at": timezone.now() + timedelta(hours=24),
|
|
},
|
|
)
|
|
return token
|
|
|
|
|
|
def send_password_reset_email(
|
|
user: User, site: Union[Site, RequestSite], token: str
|
|
) -> None:
|
|
reset_url = reverse("password_reset_confirm", kwargs={"token": token})
|
|
context = {
|
|
"user": user,
|
|
"reset_url": reset_url,
|
|
"site_name": site.name,
|
|
}
|
|
email_html = render_to_string("accounts/email/password_reset.html", context)
|
|
|
|
EmailService.send_email(
|
|
to=user.email,
|
|
subject="Reset your password",
|
|
text="Click the link to reset your password",
|
|
site=site,
|
|
html=email_html,
|
|
)
|
|
|
|
|
|
def request_password_reset(request: HttpRequest) -> HttpResponse:
|
|
if request.method != "POST":
|
|
return render(request, "accounts/password_reset.html")
|
|
|
|
if not (email := request.POST.get("email")):
|
|
messages.error(request, "Email is required")
|
|
return redirect("account_reset_password")
|
|
|
|
with suppress(User.DoesNotExist):
|
|
user = User.objects.get(email=email)
|
|
token = create_password_reset_token(user)
|
|
site = get_current_site(request)
|
|
send_password_reset_email(user, site, token)
|
|
|
|
messages.success(request, "Password reset email sent")
|
|
return redirect("account_login")
|
|
|
|
|
|
def handle_password_reset(
|
|
request: HttpRequest,
|
|
user: User,
|
|
new_password: str,
|
|
reset: PasswordReset,
|
|
site: Union[Site, RequestSite],
|
|
) -> None:
|
|
user.set_password(new_password)
|
|
user.save()
|
|
|
|
reset.used = True
|
|
reset.save()
|
|
|
|
send_password_reset_confirmation(user, site)
|
|
messages.success(request, "Password reset successfully")
|
|
|
|
|
|
def send_password_reset_confirmation(
|
|
user: User, site: Union[Site, RequestSite]
|
|
) -> None:
|
|
context = {
|
|
"user": user,
|
|
"site_name": site.name,
|
|
}
|
|
email_html = render_to_string(
|
|
"accounts/email/password_reset_complete.html", context
|
|
)
|
|
|
|
EmailService.send_email(
|
|
to=user.email,
|
|
subject="Password Reset Complete",
|
|
text="Your password has been reset successfully.",
|
|
site=site,
|
|
html=email_html,
|
|
)
|
|
|
|
|
|
def reset_password(request: HttpRequest, token: str) -> HttpResponse:
|
|
try:
|
|
reset = PasswordReset.objects.select_related("user").get(
|
|
token=token, expires_at__gt=timezone.now(), used=False
|
|
)
|
|
|
|
if request.method == "POST":
|
|
if new_password := request.POST.get("new_password"):
|
|
site = get_current_site(request)
|
|
handle_password_reset(request, reset.user, new_password, reset, site)
|
|
return redirect("account_login")
|
|
|
|
messages.error(request, "New password is required")
|
|
|
|
return render(request, "accounts/password_reset_confirm.html", {"token": token})
|
|
|
|
except PasswordReset.DoesNotExist:
|
|
messages.error(request, "Invalid or expired reset token")
|
|
return redirect("account_reset_password")
|