""" User management services for ThrillWiki. This module contains services for user account management including user deletion while preserving submissions, password management, and email change functionality. Recent additions: - AccountService: Handles password and email change operations - UserDeletionService: Manages user deletion while preserving content """ import logging import re from typing import Any, Dict, Optional from django.conf import settings from django.contrib.auth import update_session_auth_hash from django.contrib.sites.models import Site from django.contrib.sites.shortcuts import get_current_site from django.db import transaction from django.http import HttpRequest from django.template.loader import render_to_string from django.utils import timezone from django.utils.crypto import get_random_string from django_forwardemail.services import EmailService from .models import EmailVerification, User, UserDeletionRequest, UserProfile logger = logging.getLogger(__name__) class AccountService: """Service for account management operations including password and email changes.""" @staticmethod def validate_password(password: str) -> bool: """ Validate password meets requirements. Args: password: The password to validate Returns: True if password meets requirements, False otherwise """ return ( len(password) >= 8 and bool(re.search(r"[A-Z]", password)) and bool(re.search(r"[a-z]", password)) and bool(re.search(r"[0-9]", password)) ) @staticmethod def change_password( *, user: User, old_password: str, new_password: str, request: HttpRequest, ) -> Dict[str, Any]: """ Change user password with validation and notification. Validates the old password, checks new password requirements, updates the password, and sends a confirmation email. Args: user: The user whose password is being changed old_password: Current password for verification new_password: New password to set request: HTTP request for session handling Returns: Dictionary with success status, message, and optional redirect URL: { 'success': bool, 'message': str, 'redirect_url': Optional[str] } """ # Verify old password if not user.check_password(old_password): logger.warning( f"Password change failed: incorrect current password for user {user.id}" ) return { 'success': False, 'message': "Current password is incorrect", 'redirect_url': None } # Validate new password if not AccountService.validate_password(new_password): return { 'success': False, 'message': "Password must be at least 8 characters and contain uppercase, lowercase, and numbers", 'redirect_url': None } # Update password user.set_password(new_password) user.save() # Keep user logged in after password change update_session_auth_hash(request, user) # Send confirmation email AccountService._send_password_change_confirmation(request, user) logger.info(f"Password changed successfully for user {user.id}") return { 'success': True, 'message': "Password changed successfully. Please check your email for confirmation.", 'redirect_url': None } @staticmethod def _send_password_change_confirmation(request: HttpRequest, user: User) -> None: """Send password change confirmation email.""" site = get_current_site(request) context = { "user": user, "site_name": site.name, } email_html = render_to_string( "accounts/email/password_change_confirmation.html", context ) try: EmailService.send_email( to=user.email, subject="Password Changed Successfully", text="Your password has been changed successfully.", site=site, html=email_html, ) except Exception as e: logger.error(f"Failed to send password change confirmation email: {e}") @staticmethod def initiate_email_change( *, user: User, new_email: str, request: HttpRequest, ) -> Dict[str, Any]: """ Initiate email change with verification. Creates a verification token and sends a verification email to the new email address. Args: user: The user changing their email new_email: The new email address request: HTTP request for site context Returns: Dictionary with success status and message: { 'success': bool, 'message': str } """ if not new_email: return { 'success': False, 'message': "New email is required" } # Check if email is already in use if User.objects.filter(email=new_email).exclude(id=user.id).exists(): return { 'success': False, 'message': "This email address is already in use" } # Generate verification token token = get_random_string(64) # Create or update email verification record EmailVerification.objects.update_or_create( user=user, defaults={"token": token} ) # Store pending email user.pending_email = new_email user.save() # Send verification email AccountService._send_email_verification(request, user, new_email, token) logger.info(f"Email change initiated for user {user.id} to {new_email}") return { 'success': True, 'message': "Verification email sent to your new email address" } @staticmethod def _send_email_verification( request: HttpRequest, user: User, new_email: str, token: str ) -> None: """Send email verification for email change.""" from django.urls import reverse site = get_current_site(request) verification_url = reverse("verify_email", kwargs={"token": token}) context = { "user": user, "verification_url": verification_url, "site_name": site.name, } email_html = render_to_string("accounts/email/verify_email.html", context) try: EmailService.send_email( to=new_email, subject="Verify your new email address", text="Click the link to verify your new email address", site=site, html=email_html, ) except Exception as e: logger.error(f"Failed to send email verification: {e}") @staticmethod def verify_email_change(*, token: str) -> Dict[str, Any]: """ Verify email change token and update user email. Args: token: The verification token Returns: Dictionary with success status and message """ try: verification = EmailVerification.objects.select_related("user").get( token=token ) except EmailVerification.DoesNotExist: return { 'success': False, 'message': "Invalid or expired verification token" } user = verification.user if not user.pending_email: return { 'success': False, 'message': "No pending email change found" } # Update email old_email = user.email user.email = user.pending_email user.pending_email = None user.save() # Delete verification record verification.delete() logger.info(f"Email changed for user {user.id} from {old_email} to {user.email}") return { 'success': True, 'message': "Email address updated successfully" } class UserDeletionService: """Service for handling user deletion while preserving submissions.""" DELETED_USER_USERNAME = "deleted_user" DELETED_USER_EMAIL = "deleted@thrillwiki.com" DELETED_DISPLAY_NAME = "Deleted User" @classmethod def get_or_create_deleted_user(cls) -> User: """Get or create the system deleted user placeholder.""" deleted_user, created = User.objects.get_or_create( username=cls.DELETED_USER_USERNAME, defaults={ "email": cls.DELETED_USER_EMAIL, "is_active": False, "is_staff": False, "is_superuser": False, "role": User.Roles.USER, "is_banned": True, "ban_reason": "System placeholder for deleted users", "ban_date": timezone.now(), }, ) if created: # Create profile for deleted user UserProfile.objects.create( user=deleted_user, display_name=cls.DELETED_DISPLAY_NAME, bio="This user account has been deleted.", ) return deleted_user @classmethod @transaction.atomic def delete_user_preserve_submissions(cls, user: User) -> dict: """ Delete a user while preserving all their submissions. This method: 1. Transfers all user submissions to a system "deleted_user" placeholder 2. Deletes the user's profile and account data 3. Returns a summary of what was preserved Args: user: The user to delete Returns: dict: Summary of preserved submissions """ if user.username == cls.DELETED_USER_USERNAME: raise ValueError("Cannot delete the system deleted user placeholder") deleted_user = cls.get_or_create_deleted_user() # Count submissions before transfer submission_counts = { "park_reviews": getattr( user, "park_reviews", user.__class__.objects.none() ).count(), "ride_reviews": getattr( user, "ride_reviews", user.__class__.objects.none() ).count(), "uploaded_park_photos": getattr( user, "uploaded_park_photos", user.__class__.objects.none() ).count(), "uploaded_ride_photos": getattr( user, "uploaded_ride_photos", user.__class__.objects.none() ).count(), "top_lists": getattr( user, "top_lists", user.__class__.objects.none() ).count(), "edit_submissions": getattr( user, "edit_submissions", user.__class__.objects.none() ).count(), "photo_submissions": getattr( user, "photo_submissions", user.__class__.objects.none() ).count(), "moderated_park_reviews": getattr( user, "moderated_park_reviews", user.__class__.objects.none() ).count(), "moderated_ride_reviews": getattr( user, "moderated_ride_reviews", user.__class__.objects.none() ).count(), "handled_submissions": getattr( user, "handled_submissions", user.__class__.objects.none() ).count(), "handled_photos": getattr( user, "handled_photos", user.__class__.objects.none() ).count(), } # Transfer all submissions to deleted user # Reviews if hasattr(user, "park_reviews"): getattr(user, "park_reviews").update(user=deleted_user) if hasattr(user, "ride_reviews"): getattr(user, "ride_reviews").update(user=deleted_user) # Photos if hasattr(user, "uploaded_park_photos"): getattr(user, "uploaded_park_photos").update(uploaded_by=deleted_user) if hasattr(user, "uploaded_ride_photos"): getattr(user, "uploaded_ride_photos").update(uploaded_by=deleted_user) # Top Lists if hasattr(user, "top_lists"): getattr(user, "top_lists").update(user=deleted_user) # Moderation submissions if hasattr(user, "edit_submissions"): getattr(user, "edit_submissions").update(user=deleted_user) if hasattr(user, "photo_submissions"): getattr(user, "photo_submissions").update(user=deleted_user) # Moderation actions - these can be set to NULL since they're not user content if hasattr(user, "moderated_park_reviews"): getattr(user, "moderated_park_reviews").update(moderated_by=None) if hasattr(user, "moderated_ride_reviews"): getattr(user, "moderated_ride_reviews").update(moderated_by=None) if hasattr(user, "handled_submissions"): getattr(user, "handled_submissions").update(handled_by=None) if hasattr(user, "handled_photos"): getattr(user, "handled_photos").update(handled_by=None) # Store user info for the summary user_info = { "username": user.username, "user_id": user.user_id, "email": user.email, "date_joined": user.date_joined, } # Delete the user (this will cascade delete the profile) user.delete() return { "deleted_user": user_info, "preserved_submissions": submission_counts, "transferred_to": { "username": deleted_user.username, "user_id": deleted_user.user_id, }, } @classmethod def can_delete_user(cls, user: User) -> tuple[bool, Optional[str]]: """ Check if a user can be safely deleted. Args: user: The user to check Returns: tuple: (can_delete: bool, reason: Optional[str]) """ if user.username == cls.DELETED_USER_USERNAME: return False, "Cannot delete the system deleted user placeholder" if user.is_superuser: return False, "Superuser accounts cannot be deleted for security reasons. Please contact system administrator or remove superuser privileges first." # Check if user has critical admin role if user.role == User.Roles.ADMIN and user.is_staff: return False, "Admin accounts with staff privileges cannot be deleted. Please remove admin privileges first or contact system administrator." # Add any other business rules here return True, None @classmethod def request_user_deletion(cls, user: User) -> UserDeletionRequest: """ Create a user deletion request and send verification email. Args: user: The user requesting deletion Returns: UserDeletionRequest: The created deletion request """ # Check if user can be deleted can_delete, reason = cls.can_delete_user(user) if not can_delete: raise ValueError(f"Cannot delete user: {reason}") # Remove any existing deletion request for this user UserDeletionRequest.objects.filter(user=user).delete() # Create new deletion request deletion_request = UserDeletionRequest.objects.create(user=user) # Send verification email cls.send_deletion_verification_email(deletion_request) return deletion_request @classmethod def send_deletion_verification_email(cls, deletion_request: UserDeletionRequest): """ Send verification email for account deletion. Args: deletion_request: The deletion request to send email for """ user = deletion_request.user # Get current site for email service try: site = Site.objects.get_current() except Site.DoesNotExist: # Fallback to default site site = Site.objects.get_or_create( id=1, defaults={"domain": "localhost:8000", "name": "localhost:8000"} )[0] # Prepare email context context = { "user": user, "verification_code": deletion_request.verification_code, "expires_at": deletion_request.expires_at, "site_name": getattr(settings, "SITE_NAME", "ThrillWiki"), "frontend_domain": getattr( settings, "FRONTEND_DOMAIN", "http://localhost:3000" ), } # Render email content subject = f"Confirm Account Deletion - {context['site_name']}" # Create email message with 1-hour expiration notice message = f""" Hello {user.get_display_name()}, You have requested to delete your ThrillWiki account. To confirm this action, please use the following verification code: Verification Code: {deletion_request.verification_code} This code will expire in 1 hour on {deletion_request.expires_at.strftime('%B %d, %Y at %I:%M %p UTC')}. IMPORTANT: This action cannot be undone. Your account will be permanently deleted, but all your reviews, photos, and other contributions will be preserved on the site. If you did not request this deletion, please ignore this email and your account will remain active. To complete the deletion, enter the verification code in the account deletion form on our website. Best regards, The ThrillWiki Team """.strip() # Send email using custom email service try: EmailService.send_email( to=user.email, subject=subject, text=message, site=site, from_email="no-reply@thrillwiki.com", ) # Update email sent timestamp deletion_request.email_sent_at = timezone.now() deletion_request.save(update_fields=["email_sent_at"]) except Exception as e: # Log the error but don't fail the request creation print(f"Failed to send deletion verification email to {user.email}: {e}") @classmethod @transaction.atomic def verify_and_delete_user(cls, verification_code: str) -> dict: """ Verify deletion code and delete the user account. Args: verification_code: The verification code from the email Returns: dict: Summary of the deletion Raises: ValueError: If verification fails """ try: deletion_request = UserDeletionRequest.objects.get( verification_code=verification_code ) except UserDeletionRequest.DoesNotExist: raise ValueError("Invalid verification code") # Check if request is still valid if not deletion_request.is_valid(): if deletion_request.is_expired(): raise ValueError("Verification code has expired") elif deletion_request.is_used: raise ValueError("Verification code has already been used") elif deletion_request.attempts >= deletion_request.max_attempts: raise ValueError("Too many verification attempts") else: raise ValueError("Invalid verification code") # Increment attempts deletion_request.increment_attempts() # Mark as used deletion_request.mark_as_used() # Delete the user user = deletion_request.user result = cls.delete_user_preserve_submissions(user) # Add deletion request info to result result["deletion_request"] = { "verification_code": verification_code, "created_at": deletion_request.created_at, "verified_at": timezone.now(), } return result @classmethod def cancel_deletion_request(cls, user: User) -> bool: """ Cancel a pending deletion request. Args: user: The user whose deletion request to cancel Returns: bool: True if a request was cancelled, False if no request existed """ try: deletion_request = getattr(user, "deletion_request", None) if deletion_request: deletion_request.delete() return True return False except UserDeletionRequest.DoesNotExist: return False @classmethod def cleanup_expired_deletion_requests(cls) -> int: """ Clean up expired deletion requests. Returns: int: Number of expired requests cleaned up """ return UserDeletionRequest.cleanup_expired()