from django.views.generic import DetailView, TemplateView from django.contrib.auth import get_user_model from django.shortcuts import get_object_or_404, redirect, render from django.contrib.auth.decorators import login_required from django.contrib.auth.mixins import LoginRequiredMixin from django.contrib import messages from django.core.exceptions import ValidationError from allauth.socialaccount.providers.google.views import GoogleOAuth2Adapter from allauth.socialaccount.providers.discord.views import DiscordOAuth2Adapter from allauth.socialaccount.providers.oauth2.client import OAuth2Client from django.conf import settings from django.core.mail import send_mail from django.template.loader import render_to_string from django.utils.crypto import get_random_string from django.utils import timezone from datetime import timedelta from django.contrib.sites.shortcuts import get_current_site from django.db.models import Prefetch, QuerySet from django.http import HttpResponseRedirect, HttpResponse, HttpRequest from django.urls import reverse from django.contrib.auth import login from django.core.files.uploadedfile import UploadedFile from accounts.models import User, PasswordReset, TopList, EmailVerification, UserProfile from reviews.models import Review from email_service.services import EmailService from allauth.account.views import LoginView, SignupView from .mixins import TurnstileMixin from typing import Dict, Any, Optional, Union, cast, TYPE_CHECKING from django_htmx.http import HttpResponseClientRefresh from django.contrib.sites.models import Site from django.contrib.sites.requests import RequestSite from contextlib import suppress if TYPE_CHECKING: from django.contrib.sites.models import Site from django.contrib.sites.requests import RequestSite UserModel = get_user_model() class CustomLoginView(TurnstileMixin, LoginView): def form_valid(self, form): try: self.validate_turnstile(self.request) except ValidationError as e: form.add_error(None, str(e)) return self.form_invalid(form) response = super().form_valid(form) if getattr(self.request, 'htmx', False): return HttpResponseClientRefresh() return response def form_invalid(self, form): if getattr(self.request, 'htmx', False): return render( self.request, 'account/partials/login_form.html', self.get_context_data(form=form) ) return super().form_invalid(form) def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: if getattr(request, 'htmx', False): return render( request, 'account/partials/login_modal.html', self.get_context_data() ) return super().get(request, *args, **kwargs) class CustomSignupView(TurnstileMixin, SignupView): def form_valid(self, form): try: self.validate_turnstile(self.request) except ValidationError as e: form.add_error(None, str(e)) return self.form_invalid(form) response = super().form_valid(form) if getattr(self.request, 'htmx', False): return HttpResponseClientRefresh() return response def form_invalid(self, form): if getattr(self.request, 'htmx', False): return render( self.request, 'account/partials/signup_modal.html', self.get_context_data(form=form) ) return super().form_invalid(form) def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: if getattr(request, 'htmx', False): return render( request, 'account/partials/signup_modal.html', self.get_context_data() ) return super().get(request, *args, **kwargs) @login_required def user_redirect_view(request: HttpRequest) -> HttpResponse: user = cast(User, request.user) return redirect('profile', username=user.username) def handle_social_login(request: HttpRequest, email: str) -> HttpResponse: if sociallogin := request.session.get('socialaccount_sociallogin'): sociallogin.user.email = email sociallogin.save() login(request, sociallogin.user) del request.session['socialaccount_sociallogin'] messages.success(request, 'Successfully logged in') return redirect('/') return redirect('/') def email_required(request: HttpRequest) -> HttpResponse: if not request.session.get('socialaccount_sociallogin'): messages.error(request, 'No social login in progress') return redirect('/') if request.method == 'POST': if email := request.POST.get('email'): return handle_social_login(request, email) messages.error(request, 'Email is required') return render(request, 'accounts/email_required.html', {'error': 'Email is required'}) return render(request, 'accounts/email_required.html') class ProfileView(DetailView): model = User template_name = 'accounts/profile.html' context_object_name = 'profile_user' slug_field = 'username' slug_url_kwarg = 'username' def get_queryset(self) -> QuerySet[User]: return User.objects.select_related('profile') def get_context_data(self, **kwargs: Any) -> Dict[str, Any]: context = super().get_context_data(**kwargs) user = cast(User, self.get_object()) context['recent_reviews'] = self._get_user_reviews(user) context['top_lists'] = self._get_user_top_lists(user) return context def _get_user_reviews(self, user: User) -> QuerySet[Review]: return Review.objects.filter( user=user, is_published=True ).select_related( 'user', 'user__profile', 'content_type' ).prefetch_related( 'content_object' ).order_by('-created_at')[:5] def _get_user_top_lists(self, user: User) -> QuerySet[TopList]: return TopList.objects.filter( user=user ).select_related( 'user', 'user__profile' ).prefetch_related( 'items' ).order_by('-created_at')[:5] class SettingsView(LoginRequiredMixin, TemplateView): template_name = 'accounts/settings.html' def get_context_data(self, **kwargs: Any) -> Dict[str, Any]: context = super().get_context_data(**kwargs) context['user'] = self.request.user return context def _handle_profile_update(self, request: HttpRequest) -> None: user = cast(User, request.user) profile = get_object_or_404(UserProfile, user=user) if display_name := request.POST.get('display_name'): profile.display_name = display_name if 'avatar' in request.FILES: avatar_file = cast(UploadedFile, request.FILES['avatar']) profile.avatar.save(avatar_file.name, avatar_file, save=False) profile.save() user.save() messages.success(request, 'Profile updated successfully') def _handle_password_change(self, request: HttpRequest) -> Optional[HttpResponseRedirect]: user = cast(User, request.user) old_password = request.POST.get('old_password', '') new_password = request.POST.get('new_password', '') if not user.check_password(old_password): messages.error(request, 'Current password is incorrect') return None user.set_password(new_password) user.save() messages.success(request, 'Password changed successfully') return HttpResponseRedirect(reverse('account_login')) def _handle_email_change(self, request: HttpRequest) -> None: if not (new_email := request.POST.get('new_email')): messages.error(request, 'New email is required') return self._send_email_verification(request, new_email) messages.success(request, 'Verification email sent to your new email address') def _send_email_verification(self, request: HttpRequest, new_email: str) -> None: user = cast(User, request.user) token = get_random_string(64) EmailVerification.objects.update_or_create( user=user, defaults={'token': token} ) site = cast(Site, get_current_site(request)) verification_url = reverse('verify_email', kwargs={'token': token}) context = { 'user': user, 'verification_url': verification_url, 'site_name': site.name, } email_html = render_to_string('accounts/email/verify_email.html', context) EmailService.send_email( to=new_email, subject='Verify your new email address', text='Click the link to verify your new email address', site=site, html=email_html ) user.pending_email = new_email user.save() def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: action = request.POST.get('action') if action == 'update_profile': self._handle_profile_update(request) elif action == 'change_password': if response := self._handle_password_change(request): return response elif action == 'change_email': self._handle_email_change(request) return self.get(request, *args, **kwargs) def create_password_reset_token(user: User) -> str: token = get_random_string(64) PasswordReset.objects.update_or_create( user=user, defaults={ 'token': token, 'expires_at': timezone.now() + timedelta(hours=24) } ) return token def send_password_reset_email(user: User, site: Union[Site, RequestSite], token: str) -> None: reset_url = reverse('password_reset_confirm', kwargs={'token': token}) context = { 'user': user, 'reset_url': reset_url, 'site_name': site.name, } email_html = render_to_string('accounts/email/password_reset.html', context) EmailService.send_email( to=user.email, subject='Reset your password', text='Click the link to reset your password', site=site, html=email_html ) def request_password_reset(request: HttpRequest) -> HttpResponse: if request.method != 'POST': return render(request, 'accounts/password_reset.html') if not (email := request.POST.get('email')): messages.error(request, 'Email is required') return redirect('account_reset_password') with suppress(User.DoesNotExist): user = User.objects.get(email=email) token = create_password_reset_token(user) site = get_current_site(request) send_password_reset_email(user, site, token) messages.success(request, 'Password reset email sent') return redirect('account_login') def handle_password_reset(request: HttpRequest, user: User, new_password: str, reset: PasswordReset, site: Union[Site, RequestSite]) -> None: user.set_password(new_password) user.save() reset.used = True reset.save() send_password_reset_confirmation(user, site) messages.success(request, 'Password reset successfully') def send_password_reset_confirmation(user: User, site: Union[Site, RequestSite]) -> None: context = { 'user': user, 'site_name': site.name, } email_html = render_to_string('accounts/email/password_reset_complete.html', context) EmailService.send_email( to=user.email, subject='Password Reset Complete', text='Your password has been reset successfully.', site=site, html=email_html ) def reset_password(request: HttpRequest, token: str) -> HttpResponse: try: reset = PasswordReset.objects.select_related('user').get( token=token, expires_at__gt=timezone.now(), used=False ) if request.method == 'POST': if new_password := request.POST.get('new_password'): site = get_current_site(request) handle_password_reset(request, reset.user, new_password, reset, site) return redirect('account_login') messages.error(request, 'New password is required') return render(request, 'accounts/password_reset_confirm.html', {'token': token}) except PasswordReset.DoesNotExist: messages.error(request, 'Invalid or expired reset token') return redirect('account_reset_password')