mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-20 07:11:08 -05:00
395 lines
14 KiB
Python
395 lines
14 KiB
Python
from django.views.generic import DetailView, TemplateView
|
|
from django.contrib.auth import get_user_model
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.contrib.auth.mixins import LoginRequiredMixin
|
|
from django.contrib import messages
|
|
from django.core.exceptions import ValidationError
|
|
from allauth.socialaccount.providers.google.views import GoogleOAuth2Adapter
|
|
from allauth.socialaccount.providers.discord.views import DiscordOAuth2Adapter
|
|
from allauth.socialaccount.providers.oauth2.client import OAuth2Client
|
|
from django.conf import settings
|
|
from django.core.mail import send_mail
|
|
from django.template.loader import render_to_string
|
|
from django.utils.crypto import get_random_string
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from django.db.models import Prefetch, QuerySet
|
|
from django.http import HttpResponseRedirect, HttpResponse, HttpRequest
|
|
from django.urls import reverse
|
|
from django.contrib.auth import login
|
|
from django.core.files.uploadedfile import UploadedFile
|
|
from accounts.models import User, PasswordReset, TopList, EmailVerification, UserProfile
|
|
from reviews.models import Review
|
|
from email_service.services import EmailService
|
|
from allauth.account.views import LoginView, SignupView
|
|
from .mixins import TurnstileMixin
|
|
from typing import Dict, Any, Optional, Union, cast, TYPE_CHECKING
|
|
from django_htmx.http import HttpResponseClientRefresh
|
|
from django.contrib.sites.models import Site
|
|
from django.contrib.sites.requests import RequestSite
|
|
from contextlib import suppress
|
|
import re
|
|
|
|
if TYPE_CHECKING:
|
|
from django.contrib.sites.models import Site
|
|
from django.contrib.sites.requests import RequestSite
|
|
|
|
UserModel = get_user_model()
|
|
|
|
class CustomLoginView(TurnstileMixin, LoginView):
|
|
def form_valid(self, form):
|
|
try:
|
|
self.validate_turnstile(self.request)
|
|
except ValidationError as e:
|
|
form.add_error(None, str(e))
|
|
return self.form_invalid(form)
|
|
|
|
response = super().form_valid(form)
|
|
|
|
if getattr(self.request, 'htmx', False):
|
|
return HttpResponseClientRefresh()
|
|
return response
|
|
|
|
def form_invalid(self, form):
|
|
if getattr(self.request, 'htmx', False):
|
|
return render(
|
|
self.request,
|
|
'account/partials/login_form.html',
|
|
self.get_context_data(form=form)
|
|
)
|
|
return super().form_invalid(form)
|
|
|
|
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
if getattr(request, 'htmx', False):
|
|
return render(
|
|
request,
|
|
'account/partials/login_modal.html',
|
|
self.get_context_data()
|
|
)
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
class CustomSignupView(TurnstileMixin, SignupView):
|
|
def form_valid(self, form):
|
|
try:
|
|
self.validate_turnstile(self.request)
|
|
except ValidationError as e:
|
|
form.add_error(None, str(e))
|
|
return self.form_invalid(form)
|
|
|
|
response = super().form_valid(form)
|
|
|
|
if getattr(self.request, 'htmx', False):
|
|
return HttpResponseClientRefresh()
|
|
return response
|
|
|
|
def form_invalid(self, form):
|
|
if getattr(self.request, 'htmx', False):
|
|
return render(
|
|
self.request,
|
|
'account/partials/signup_modal.html',
|
|
self.get_context_data(form=form)
|
|
)
|
|
return super().form_invalid(form)
|
|
|
|
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
if getattr(request, 'htmx', False):
|
|
return render(
|
|
request,
|
|
'account/partials/signup_modal.html',
|
|
self.get_context_data()
|
|
)
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
@login_required
|
|
def user_redirect_view(request: HttpRequest) -> HttpResponse:
|
|
user = cast(User, request.user)
|
|
return redirect('profile', username=user.username)
|
|
|
|
def handle_social_login(request: HttpRequest, email: str) -> HttpResponse:
|
|
if sociallogin := request.session.get('socialaccount_sociallogin'):
|
|
sociallogin.user.email = email
|
|
sociallogin.save()
|
|
login(request, sociallogin.user)
|
|
del request.session['socialaccount_sociallogin']
|
|
messages.success(request, 'Successfully logged in')
|
|
return redirect('/')
|
|
return redirect('/')
|
|
|
|
def email_required(request: HttpRequest) -> HttpResponse:
|
|
if not request.session.get('socialaccount_sociallogin'):
|
|
messages.error(request, 'No social login in progress')
|
|
return redirect('/')
|
|
|
|
if request.method == 'POST':
|
|
if email := request.POST.get('email'):
|
|
return handle_social_login(request, email)
|
|
messages.error(request, 'Email is required')
|
|
return render(request, 'accounts/email_required.html', {'error': 'Email is required'})
|
|
|
|
return render(request, 'accounts/email_required.html')
|
|
|
|
class ProfileView(DetailView):
|
|
model = User
|
|
template_name = 'accounts/profile.html'
|
|
context_object_name = 'profile_user'
|
|
slug_field = 'username'
|
|
slug_url_kwarg = 'username'
|
|
|
|
def get_queryset(self) -> QuerySet[User]:
|
|
return User.objects.select_related('profile')
|
|
|
|
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
user = cast(User, self.get_object())
|
|
|
|
context['recent_reviews'] = self._get_user_reviews(user)
|
|
context['top_lists'] = self._get_user_top_lists(user)
|
|
|
|
return context
|
|
|
|
def _get_user_reviews(self, user: User) -> QuerySet[Review]:
|
|
return Review.objects.filter(
|
|
user=user,
|
|
is_published=True
|
|
).select_related(
|
|
'user',
|
|
'user__profile',
|
|
'content_type'
|
|
).prefetch_related(
|
|
'content_object'
|
|
).order_by('-created_at')[:5]
|
|
|
|
def _get_user_top_lists(self, user: User) -> QuerySet[TopList]:
|
|
return TopList.objects.filter(
|
|
user=user
|
|
).select_related(
|
|
'user',
|
|
'user__profile'
|
|
).prefetch_related(
|
|
'items'
|
|
).order_by('-created_at')[:5]
|
|
|
|
class SettingsView(LoginRequiredMixin, TemplateView):
|
|
template_name = 'accounts/settings.html'
|
|
|
|
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
|
|
context = super().get_context_data(**kwargs)
|
|
context['user'] = self.request.user
|
|
return context
|
|
|
|
def _handle_profile_update(self, request: HttpRequest) -> None:
|
|
user = cast(User, request.user)
|
|
profile = get_object_or_404(UserProfile, user=user)
|
|
|
|
if display_name := request.POST.get('display_name'):
|
|
profile.display_name = display_name
|
|
|
|
if 'avatar' in request.FILES:
|
|
avatar_file = cast(UploadedFile, request.FILES['avatar'])
|
|
profile.avatar.save(avatar_file.name, avatar_file, save=False)
|
|
profile.save()
|
|
|
|
user.save()
|
|
messages.success(request, 'Profile updated successfully')
|
|
|
|
def _validate_password(self, password: str) -> bool:
|
|
"""Validate password meets requirements."""
|
|
if len(password) < 8:
|
|
return False
|
|
if not re.search(r'[A-Z]', password):
|
|
return False
|
|
if not re.search(r'[a-z]', password):
|
|
return False
|
|
if not re.search(r'[0-9]', password):
|
|
return False
|
|
return True
|
|
|
|
def _send_password_change_confirmation(self, request: HttpRequest, user: User) -> None:
|
|
"""Send password change confirmation email."""
|
|
site = get_current_site(request)
|
|
context = {
|
|
'user': user,
|
|
'site_name': site.name,
|
|
}
|
|
|
|
email_html = render_to_string('accounts/email/password_change_confirmation.html', context)
|
|
|
|
EmailService.send_email(
|
|
to=user.email,
|
|
subject='Password Changed Successfully',
|
|
text='Your password has been changed successfully.',
|
|
site=site,
|
|
html=email_html
|
|
)
|
|
|
|
def _handle_password_change(self, request: HttpRequest) -> Optional[HttpResponseRedirect]:
|
|
user = cast(User, request.user)
|
|
old_password = request.POST.get('old_password', '')
|
|
new_password = request.POST.get('new_password', '')
|
|
confirm_password = request.POST.get('confirm_password', '')
|
|
|
|
if not user.check_password(old_password):
|
|
messages.error(request, 'Current password is incorrect')
|
|
return None
|
|
|
|
if new_password != confirm_password:
|
|
messages.error(request, 'New passwords do not match')
|
|
return None
|
|
|
|
if not self._validate_password(new_password):
|
|
messages.error(request, 'Password must be at least 8 characters and contain uppercase, lowercase, and numbers')
|
|
return None
|
|
|
|
user.set_password(new_password)
|
|
user.save()
|
|
|
|
# Send confirmation email
|
|
self._send_password_change_confirmation(request, user)
|
|
|
|
messages.success(request, 'Password changed successfully. Please check your email for confirmation.')
|
|
return HttpResponseRedirect(reverse('account_login'))
|
|
|
|
def _handle_email_change(self, request: HttpRequest) -> None:
|
|
if not (new_email := request.POST.get('new_email')):
|
|
messages.error(request, 'New email is required')
|
|
return
|
|
|
|
self._send_email_verification(request, new_email)
|
|
messages.success(request, 'Verification email sent to your new email address')
|
|
|
|
def _send_email_verification(self, request: HttpRequest, new_email: str) -> None:
|
|
user = cast(User, request.user)
|
|
token = get_random_string(64)
|
|
EmailVerification.objects.update_or_create(
|
|
user=user,
|
|
defaults={'token': token}
|
|
)
|
|
|
|
site = cast(Site, get_current_site(request))
|
|
verification_url = reverse('verify_email', kwargs={'token': token})
|
|
|
|
context = {
|
|
'user': user,
|
|
'verification_url': verification_url,
|
|
'site_name': site.name,
|
|
}
|
|
|
|
email_html = render_to_string('accounts/email/verify_email.html', context)
|
|
EmailService.send_email(
|
|
to=new_email,
|
|
subject='Verify your new email address',
|
|
text='Click the link to verify your new email address',
|
|
site=site,
|
|
html=email_html
|
|
)
|
|
|
|
user.pending_email = new_email
|
|
user.save()
|
|
|
|
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
action = request.POST.get('action')
|
|
|
|
if action == 'update_profile':
|
|
self._handle_profile_update(request)
|
|
elif action == 'change_password':
|
|
if response := self._handle_password_change(request):
|
|
return response
|
|
elif action == 'change_email':
|
|
self._handle_email_change(request)
|
|
|
|
return self.get(request, *args, **kwargs)
|
|
|
|
def create_password_reset_token(user: User) -> str:
|
|
token = get_random_string(64)
|
|
PasswordReset.objects.update_or_create(
|
|
user=user,
|
|
defaults={
|
|
'token': token,
|
|
'expires_at': timezone.now() + timedelta(hours=24)
|
|
}
|
|
)
|
|
return token
|
|
|
|
def send_password_reset_email(user: User, site: Union[Site, RequestSite], token: str) -> None:
|
|
reset_url = reverse('password_reset_confirm', kwargs={'token': token})
|
|
context = {
|
|
'user': user,
|
|
'reset_url': reset_url,
|
|
'site_name': site.name,
|
|
}
|
|
email_html = render_to_string('accounts/email/password_reset.html', context)
|
|
|
|
EmailService.send_email(
|
|
to=user.email,
|
|
subject='Reset your password',
|
|
text='Click the link to reset your password',
|
|
site=site,
|
|
html=email_html
|
|
)
|
|
|
|
def request_password_reset(request: HttpRequest) -> HttpResponse:
|
|
if request.method != 'POST':
|
|
return render(request, 'accounts/password_reset.html')
|
|
|
|
if not (email := request.POST.get('email')):
|
|
messages.error(request, 'Email is required')
|
|
return redirect('account_reset_password')
|
|
|
|
with suppress(User.DoesNotExist):
|
|
user = User.objects.get(email=email)
|
|
token = create_password_reset_token(user)
|
|
site = get_current_site(request)
|
|
send_password_reset_email(user, site, token)
|
|
|
|
messages.success(request, 'Password reset email sent')
|
|
return redirect('account_login')
|
|
|
|
def handle_password_reset(request: HttpRequest, user: User, new_password: str, reset: PasswordReset, site: Union[Site, RequestSite]) -> None:
|
|
user.set_password(new_password)
|
|
user.save()
|
|
|
|
reset.used = True
|
|
reset.save()
|
|
|
|
send_password_reset_confirmation(user, site)
|
|
messages.success(request, 'Password reset successfully')
|
|
|
|
def send_password_reset_confirmation(user: User, site: Union[Site, RequestSite]) -> None:
|
|
context = {
|
|
'user': user,
|
|
'site_name': site.name,
|
|
}
|
|
email_html = render_to_string('accounts/email/password_reset_complete.html', context)
|
|
|
|
EmailService.send_email(
|
|
to=user.email,
|
|
subject='Password Reset Complete',
|
|
text='Your password has been reset successfully.',
|
|
site=site,
|
|
html=email_html
|
|
)
|
|
|
|
def reset_password(request: HttpRequest, token: str) -> HttpResponse:
|
|
try:
|
|
reset = PasswordReset.objects.select_related('user').get(
|
|
token=token,
|
|
expires_at__gt=timezone.now(),
|
|
used=False
|
|
)
|
|
|
|
if request.method == 'POST':
|
|
if new_password := request.POST.get('new_password'):
|
|
site = get_current_site(request)
|
|
handle_password_reset(request, reset.user, new_password, reset, site)
|
|
return redirect('account_login')
|
|
|
|
messages.error(request, 'New password is required')
|
|
|
|
return render(request, 'accounts/password_reset_confirm.html', {'token': token})
|
|
|
|
except PasswordReset.DoesNotExist:
|
|
messages.error(request, 'Invalid or expired reset token')
|
|
return redirect('account_reset_password')
|