Files
thrillwiki_django_no_react/accounts/views.py

382 lines
14 KiB
Python

from django.views.generic import DetailView, TemplateView
from django.contrib.auth import get_user_model
from django.shortcuts import get_object_or_404, redirect, render
from django.contrib.auth.decorators import login_required
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib import messages
from django.core.exceptions import ValidationError
from allauth.socialaccount.providers.google.views import GoogleOAuth2Adapter
from allauth.socialaccount.providers.discord.views import DiscordOAuth2Adapter
from allauth.socialaccount.providers.oauth2.client import OAuth2Client
from django.conf import settings
from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.utils.crypto import get_random_string
from django.utils import timezone
from datetime import timedelta
from django.contrib.sites.shortcuts import get_current_site
from django.db.models import Prefetch, QuerySet
from django.http import HttpResponseRedirect, HttpResponse, HttpRequest
from django.urls import reverse
from django.contrib.auth import login
from django.core.files.uploadedfile import UploadedFile
from accounts.models import User, PasswordReset, TopList, EmailVerification, UserProfile
from reviews.models import Review
from email_service.services import EmailService
from allauth.account.views import LoginView, SignupView
from .mixins import TurnstileMixin
from typing import Dict, Any, Optional, Union, cast, TYPE_CHECKING
from django_htmx.http import HttpResponseClientRefresh
from django.contrib.sites.models import Site
from django.contrib.sites.requests import RequestSite
from contextlib import suppress
import re
if TYPE_CHECKING:
from django.contrib.sites.models import Site
from django.contrib.sites.requests import RequestSite
UserModel = get_user_model()
class CustomLoginView(TurnstileMixin, LoginView):
def form_valid(self, form):
try:
self.validate_turnstile(self.request)
except ValidationError as e:
form.add_error(None, str(e))
return self.form_invalid(form)
response = super().form_valid(form)
return HttpResponseClientRefresh() if getattr(self.request, 'htmx', False) else response
def form_invalid(self, form):
if getattr(self.request, 'htmx', False):
return render(
self.request,
'account/partials/login_form.html',
self.get_context_data(form=form)
)
return super().form_invalid(form)
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
if getattr(request, 'htmx', False):
return render(
request,
'account/partials/login_modal.html',
self.get_context_data()
)
return super().get(request, *args, **kwargs)
class CustomSignupView(TurnstileMixin, SignupView):
def form_valid(self, form):
try:
self.validate_turnstile(self.request)
except ValidationError as e:
form.add_error(None, str(e))
return self.form_invalid(form)
response = super().form_valid(form)
return HttpResponseClientRefresh() if getattr(self.request, 'htmx', False) else response
def form_invalid(self, form):
if getattr(self.request, 'htmx', False):
return render(
self.request,
'account/partials/signup_modal.html',
self.get_context_data(form=form)
)
return super().form_invalid(form)
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
if getattr(request, 'htmx', False):
return render(
request,
'account/partials/signup_modal.html',
self.get_context_data()
)
return super().get(request, *args, **kwargs)
@login_required
def user_redirect_view(request: HttpRequest) -> HttpResponse:
user = cast(User, request.user)
return redirect('profile', username=user.username)
def handle_social_login(request: HttpRequest, email: str) -> HttpResponse:
if sociallogin := request.session.get('socialaccount_sociallogin'):
sociallogin.user.email = email
sociallogin.save()
login(request, sociallogin.user)
del request.session['socialaccount_sociallogin']
messages.success(request, 'Successfully logged in')
return redirect('/')
def email_required(request: HttpRequest) -> HttpResponse:
if not request.session.get('socialaccount_sociallogin'):
messages.error(request, 'No social login in progress')
return redirect('/')
if request.method == 'POST':
if email := request.POST.get('email'):
return handle_social_login(request, email)
messages.error(request, 'Email is required')
return render(request, 'accounts/email_required.html', {'error': 'Email is required'})
return render(request, 'accounts/email_required.html')
class ProfileView(DetailView):
model = User
template_name = 'accounts/profile.html'
context_object_name = 'profile_user'
slug_field = 'username'
slug_url_kwarg = 'username'
def get_queryset(self) -> QuerySet[User]:
return User.objects.select_related('profile')
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
context = super().get_context_data(**kwargs)
user = cast(User, self.get_object())
context['recent_reviews'] = self._get_user_reviews(user)
context['top_lists'] = self._get_user_top_lists(user)
return context
def _get_user_reviews(self, user: User) -> QuerySet[Review]:
return Review.objects.filter(
user=user,
is_published=True
).select_related(
'user',
'user__profile',
'content_type'
).prefetch_related(
'content_object'
).order_by('-created_at')[:5]
def _get_user_top_lists(self, user: User) -> QuerySet[TopList]:
return TopList.objects.filter(
user=user
).select_related(
'user',
'user__profile'
).prefetch_related(
'items'
).order_by('-created_at')[:5]
class SettingsView(LoginRequiredMixin, TemplateView):
template_name = 'accounts/settings.html'
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
context = super().get_context_data(**kwargs)
context['user'] = self.request.user
return context
def _handle_profile_update(self, request: HttpRequest) -> None:
user = cast(User, request.user)
profile = get_object_or_404(UserProfile, user=user)
if display_name := request.POST.get('display_name'):
profile.display_name = display_name
if 'avatar' in request.FILES:
avatar_file = cast(UploadedFile, request.FILES['avatar'])
profile.avatar.save(avatar_file.name, avatar_file, save=False)
profile.save()
user.save()
messages.success(request, 'Profile updated successfully')
def _validate_password(self, password: str) -> bool:
"""Validate password meets requirements."""
return (
len(password) >= 8 and
bool(re.search(r'[A-Z]', password)) and
bool(re.search(r'[a-z]', password)) and
bool(re.search(r'[0-9]', password))
)
def _send_password_change_confirmation(self, request: HttpRequest, user: User) -> None:
"""Send password change confirmation email."""
site = get_current_site(request)
context = {
'user': user,
'site_name': site.name,
}
email_html = render_to_string('accounts/email/password_change_confirmation.html', context)
EmailService.send_email(
to=user.email,
subject='Password Changed Successfully',
text='Your password has been changed successfully.',
site=site,
html=email_html
)
def _handle_password_change(self, request: HttpRequest) -> Optional[HttpResponseRedirect]:
user = cast(User, request.user)
old_password = request.POST.get('old_password', '')
new_password = request.POST.get('new_password', '')
confirm_password = request.POST.get('confirm_password', '')
if not user.check_password(old_password):
messages.error(request, 'Current password is incorrect')
return None
if new_password != confirm_password:
messages.error(request, 'New passwords do not match')
return None
if not self._validate_password(new_password):
messages.error(request, 'Password must be at least 8 characters and contain uppercase, lowercase, and numbers')
return None
user.set_password(new_password)
user.save()
self._send_password_change_confirmation(request, user)
messages.success(request, 'Password changed successfully. Please check your email for confirmation.')
return HttpResponseRedirect(reverse('account_login'))
def _handle_email_change(self, request: HttpRequest) -> None:
if new_email := request.POST.get('new_email'):
self._send_email_verification(request, new_email)
messages.success(request, 'Verification email sent to your new email address')
else:
messages.error(request, 'New email is required')
def _send_email_verification(self, request: HttpRequest, new_email: str) -> None:
user = cast(User, request.user)
token = get_random_string(64)
EmailVerification.objects.update_or_create(
user=user,
defaults={'token': token}
)
site = cast(Site, get_current_site(request))
verification_url = reverse('verify_email', kwargs={'token': token})
context = {
'user': user,
'verification_url': verification_url,
'site_name': site.name,
}
email_html = render_to_string('accounts/email/verify_email.html', context)
EmailService.send_email(
to=new_email,
subject='Verify your new email address',
text='Click the link to verify your new email address',
site=site,
html=email_html
)
user.pending_email = new_email
user.save()
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
action = request.POST.get('action')
if action == 'update_profile':
self._handle_profile_update(request)
elif action == 'change_password':
if response := self._handle_password_change(request):
return response
elif action == 'change_email':
self._handle_email_change(request)
return self.get(request, *args, **kwargs)
def create_password_reset_token(user: User) -> str:
token = get_random_string(64)
PasswordReset.objects.update_or_create(
user=user,
defaults={
'token': token,
'expires_at': timezone.now() + timedelta(hours=24)
}
)
return token
def send_password_reset_email(user: User, site: Union[Site, RequestSite], token: str) -> None:
reset_url = reverse('password_reset_confirm', kwargs={'token': token})
context = {
'user': user,
'reset_url': reset_url,
'site_name': site.name,
}
email_html = render_to_string('accounts/email/password_reset.html', context)
EmailService.send_email(
to=user.email,
subject='Reset your password',
text='Click the link to reset your password',
site=site,
html=email_html
)
def request_password_reset(request: HttpRequest) -> HttpResponse:
if request.method != 'POST':
return render(request, 'accounts/password_reset.html')
if not (email := request.POST.get('email')):
messages.error(request, 'Email is required')
return redirect('account_reset_password')
with suppress(User.DoesNotExist):
user = User.objects.get(email=email)
token = create_password_reset_token(user)
site = get_current_site(request)
send_password_reset_email(user, site, token)
messages.success(request, 'Password reset email sent')
return redirect('account_login')
def handle_password_reset(request: HttpRequest, user: User, new_password: str, reset: PasswordReset, site: Union[Site, RequestSite]) -> None:
user.set_password(new_password)
user.save()
reset.used = True
reset.save()
send_password_reset_confirmation(user, site)
messages.success(request, 'Password reset successfully')
def send_password_reset_confirmation(user: User, site: Union[Site, RequestSite]) -> None:
context = {
'user': user,
'site_name': site.name,
}
email_html = render_to_string('accounts/email/password_reset_complete.html', context)
EmailService.send_email(
to=user.email,
subject='Password Reset Complete',
text='Your password has been reset successfully.',
site=site,
html=email_html
)
def reset_password(request: HttpRequest, token: str) -> HttpResponse:
try:
reset = PasswordReset.objects.select_related('user').get(
token=token,
expires_at__gt=timezone.now(),
used=False
)
if request.method == 'POST':
if new_password := request.POST.get('new_password'):
site = get_current_site(request)
handle_password_reset(request, reset.user, new_password, reset, site)
return redirect('account_login')
messages.error(request, 'New password is required')
return render(request, 'accounts/password_reset_confirm.html', {'token': token})
except PasswordReset.DoesNotExist:
messages.error(request, 'Invalid or expired reset token')
return redirect('account_reset_password')