""" MFA (Multi-Factor Authentication) API Views Provides REST API endpoints for MFA operations using django-allauth's mfa module. Supports TOTP (Time-based One-Time Password) authentication. """ import base64 from io import BytesIO from django.conf import settings from drf_spectacular.utils import extend_schema from rest_framework import status from rest_framework.decorators import api_view, permission_classes from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response try: import qrcode HAS_QRCODE = True except ImportError: HAS_QRCODE = False @extend_schema( operation_id="get_mfa_status", summary="Get MFA status for current user", description="Returns whether MFA is enabled and what methods are configured.", responses={ 200: { "description": "MFA status", "example": { "mfa_enabled": True, "totp_enabled": True, "recovery_codes_count": 10, }, }, }, tags=["MFA"], ) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def get_mfa_status(request): """Get MFA status for current user.""" from allauth.mfa.models import Authenticator user = request.user authenticators = Authenticator.objects.filter(user=user) totp_enabled = authenticators.filter(type=Authenticator.Type.TOTP).exists() recovery_enabled = authenticators.filter(type=Authenticator.Type.RECOVERY_CODES).exists() # Check for WebAuthn/Passkey authenticators passkey_enabled = authenticators.filter(type=Authenticator.Type.WEBAUTHN).exists() passkey_count = authenticators.filter(type=Authenticator.Type.WEBAUTHN).count() # Count recovery codes if any recovery_count = 0 if recovery_enabled: try: recovery_auth = authenticators.get(type=Authenticator.Type.RECOVERY_CODES) recovery_count = len(recovery_auth.data.get("codes", [])) except Authenticator.DoesNotExist: pass # Check for Discord social account with MFA enabled discord_mfa_enabled = False connected_provider = None try: social_accounts = user.socialaccount_set.all() for social_account in social_accounts: if social_account.provider == "discord": connected_provider = "discord" discord_mfa_enabled = social_account.extra_data.get("mfa_enabled", False) break elif social_account.provider == "google": connected_provider = "google" # Google doesn't expose MFA status except Exception: pass # has_second_factor is True if user has either TOTP or Passkey configured has_second_factor = totp_enabled or passkey_enabled return Response( { "mfa_enabled": totp_enabled, # Backward compatibility "totp_enabled": totp_enabled, "passkey_enabled": passkey_enabled, "passkey_count": passkey_count, "recovery_codes_enabled": recovery_enabled, "recovery_codes_count": recovery_count, "has_second_factor": has_second_factor, # New fields for enhanced MFA satisfaction "discord_mfa_enabled": discord_mfa_enabled, "connected_provider": connected_provider, } ) @extend_schema( operation_id="setup_totp", summary="Initialize TOTP setup", description="Generates a new TOTP secret and returns the QR code for scanning.", responses={ 200: { "description": "TOTP setup data", "example": { "secret": "ABCDEFGHIJKLMNOP", "provisioning_uri": "otpauth://totp/ThrillWiki:user@example.com?secret=...", "qr_code_base64": "data:image/png;base64,...", }, }, }, tags=["MFA"], ) @api_view(["POST"]) @permission_classes([IsAuthenticated]) def setup_totp(request): """Generate TOTP secret and QR code for setup.""" from django.utils import timezone from allauth.mfa.totp.internal import auth as totp_auth user = request.user # Generate TOTP secret secret = totp_auth.get_totp_secret(None) # Generate new secret # Build provisioning URI issuer = getattr(settings, "MFA_TOTP_ISSUER", "ThrillWiki") account_name = user.email or user.username uri = f"otpauth://totp/{issuer}:{account_name}?secret={secret}&issuer={issuer}" # Generate QR code if qrcode library is available qr_code_base64 = None if HAS_QRCODE: qr = qrcode.make(uri) buffer = BytesIO() qr.save(buffer, format="PNG") qr_code_base64 = f"data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode()}" # Store secret in session for later verification with 15-minute expiry request.session["pending_totp_secret"] = secret request.session["pending_totp_expires"] = (timezone.now().timestamp() + 900) # 15 minutes return Response( { "secret": secret, "provisioning_uri": uri, "qr_code_base64": qr_code_base64, "expires_in_seconds": 900, } ) @extend_schema( operation_id="activate_totp", summary="Activate TOTP with verification code", description="Verifies the TOTP code and activates 2FA for the user.", request={ "application/json": { "type": "object", "properties": { "code": { "type": "string", "description": "6-digit TOTP code from authenticator app", "example": "123456", } }, "required": ["code"], } }, responses={ 200: { "description": "TOTP activated successfully", "example": { "detail": "Two-factor authentication enabled", "recovery_codes": ["ABCD1234", "EFGH5678"], }, }, 400: {"description": "Invalid code or missing setup data"}, }, tags=["MFA"], ) @api_view(["POST"]) @permission_classes([IsAuthenticated]) def activate_totp(request): """Verify TOTP code and activate MFA.""" from django.utils import timezone from allauth.mfa.models import Authenticator from allauth.mfa.recovery_codes.internal.auth import RecoveryCodes from allauth.mfa.totp.internal import auth as totp_auth from apps.accounts.services.security_service import ( log_security_event, send_security_notification, ) user = request.user code = request.data.get("code", "").strip() if not code: return Response( {"detail": "Verification code is required"}, status=status.HTTP_400_BAD_REQUEST, ) # Get pending secret from session OR from request body # (request body is used as fallback for JWT auth where sessions may not persist) secret = request.session.get("pending_totp_secret") or request.data.get("secret", "").strip() if not secret: return Response( {"detail": "No pending TOTP setup. Please start setup again."}, status=status.HTTP_400_BAD_REQUEST, ) # Check if setup has expired (15 minute timeout) expires_at = request.session.get("pending_totp_expires") if expires_at and timezone.now().timestamp() > expires_at: # Clear expired session data if "pending_totp_secret" in request.session: del request.session["pending_totp_secret"] if "pending_totp_expires" in request.session: del request.session["pending_totp_expires"] return Response( {"detail": "TOTP setup session expired. Please start setup again."}, status=status.HTTP_400_BAD_REQUEST, ) # Verify the code if not totp_auth.validate_totp_code(secret, code): return Response( {"detail": "Invalid verification code"}, status=status.HTTP_400_BAD_REQUEST, ) # Check if already has TOTP if Authenticator.objects.filter(user=user, type=Authenticator.Type.TOTP).exists(): return Response( {"detail": "TOTP is already enabled"}, status=status.HTTP_400_BAD_REQUEST, ) # Create TOTP authenticator Authenticator.objects.create( user=user, type=Authenticator.Type.TOTP, data={"secret": secret}, ) # Generate recovery codes using allauth's RecoveryCodes API recovery_instance = RecoveryCodes.activate(user) codes = recovery_instance.get_unused_codes() # Clear session (only if it exists - won't exist with JWT auth + secret from body) if "pending_totp_secret" in request.session: del request.session["pending_totp_secret"] if "pending_totp_expires" in request.session: del request.session["pending_totp_expires"] # Log security event log_security_event( "mfa_enrolled", request, user=user, metadata={"method": "totp"}, ) # Send security notification email send_security_notification(user, "mfa_enrolled", {"method": "TOTP Authenticator"}) return Response( { "detail": "Two-factor authentication enabled", "recovery_codes": codes, "recovery_codes_count": len(codes), } ) @extend_schema( operation_id="deactivate_totp", summary="Disable TOTP authentication", description="Removes TOTP from the user's account after password verification.", request={ "application/json": { "type": "object", "properties": { "password": { "type": "string", "description": "Current password for confirmation", } }, "required": ["password"], } }, responses={ 200: { "description": "TOTP disabled", "example": {"detail": "Two-factor authentication disabled"}, }, 400: {"description": "Invalid password or MFA not enabled"}, }, tags=["MFA"], ) @api_view(["POST"]) @permission_classes([IsAuthenticated]) def deactivate_totp(request): """Disable TOTP authentication.""" from allauth.mfa.models import Authenticator from apps.accounts.services.security_service import ( check_auth_method_availability, log_security_event, send_security_notification, ) user = request.user password = request.data.get("password", "") recovery_code = request.data.get("recovery_code", "") # Check if user has other auth methods before we allow disabling MFA auth_methods = check_auth_method_availability(user) # If TOTP is their only way in alongside passkeys, we need to ensure they have # at least password or social login to fall back on if not auth_methods["has_password"] and not auth_methods["has_social"] and not auth_methods["has_passkey"]: return Response( {"detail": "Cannot disable MFA: you must have at least one authentication method. Please set a password or connect a social account first."}, status=status.HTTP_400_BAD_REQUEST, ) # Verify password OR recovery code verified = False verification_method = None if password and user.check_password(password): verified = True verification_method = "password" elif recovery_code: # Try to verify with recovery code try: recovery_auth = Authenticator.objects.get( user=user, type=Authenticator.Type.RECOVERY_CODES ) unused_codes = recovery_auth.data.get("codes", []) if recovery_code.upper().replace("-", "").replace(" ", "") in [ c.upper().replace("-", "").replace(" ", "") for c in unused_codes ]: verified = True verification_method = "recovery_code" # Remove the used code unused_codes = [ c for c in unused_codes if c.upper().replace("-", "").replace(" ", "") != recovery_code.upper().replace("-", "").replace(" ", "") ] recovery_auth.data["codes"] = unused_codes recovery_auth.save() except Authenticator.DoesNotExist: pass if not verified: return Response( {"detail": "Invalid password or recovery code"}, status=status.HTTP_400_BAD_REQUEST, ) # Remove TOTP and recovery codes deleted_count, _ = Authenticator.objects.filter( user=user, type__in=[Authenticator.Type.TOTP, Authenticator.Type.RECOVERY_CODES] ).delete() if deleted_count == 0: return Response( {"detail": "Two-factor authentication is not enabled"}, status=status.HTTP_400_BAD_REQUEST, ) # Log security event log_security_event( "mfa_disabled", request, user=user, metadata={"method": "totp", "verified_via": verification_method}, ) # Send security notification email send_security_notification(user, "mfa_disabled", {"method": "TOTP Authenticator"}) return Response( { "detail": "Two-factor authentication disabled", } ) @extend_schema( operation_id="verify_totp", summary="Verify TOTP code during login", description="Verifies the TOTP code as part of the login process.", request={ "application/json": { "type": "object", "properties": {"code": {"type": "string", "description": "6-digit TOTP code"}}, "required": ["code"], } }, responses={ 200: {"description": "Code verified", "example": {"success": True}}, 400: {"description": "Invalid code"}, }, tags=["MFA"], ) @api_view(["POST"]) @permission_classes([IsAuthenticated]) def verify_totp(request): """Verify TOTP code.""" from allauth.mfa.models import Authenticator from allauth.mfa.totp.internal import auth as totp_auth user = request.user code = request.data.get("code", "").strip() if not code: return Response( {"detail": "Verification code is required"}, status=status.HTTP_400_BAD_REQUEST, ) try: authenticator = Authenticator.objects.get(user=user, type=Authenticator.Type.TOTP) secret = authenticator.data.get("secret") if totp_auth.validate_totp_code(secret, code): return Response({"success": True}) else: return Response( {"detail": "Invalid verification code"}, status=status.HTTP_400_BAD_REQUEST, ) except Authenticator.DoesNotExist: return Response( {"detail": "TOTP is not enabled"}, status=status.HTTP_400_BAD_REQUEST, ) @extend_schema( operation_id="regenerate_recovery_codes", summary="Regenerate recovery codes", description="Generates new recovery codes (invalidates old ones).", request={ "application/json": { "type": "object", "properties": {"password": {"type": "string", "description": "Current password"}}, "required": ["password"], } }, responses={ 200: { "description": "New recovery codes", "example": {"success": True, "recovery_codes": ["ABCD1234", "EFGH5678"]}, }, 400: {"description": "Invalid password or MFA not enabled"}, }, tags=["MFA"], ) @api_view(["POST"]) @permission_classes([IsAuthenticated]) def regenerate_recovery_codes(request): """Regenerate recovery codes.""" from allauth.mfa.models import Authenticator from allauth.mfa.recovery_codes.internal.auth import RecoveryCodes from apps.accounts.services.security_service import ( log_security_event, send_security_notification, ) user = request.user password = request.data.get("password", "") # Verify password if not user.check_password(password): return Response( {"detail": "Invalid password"}, status=status.HTTP_400_BAD_REQUEST, ) # Check if MFA is enabled (TOTP or Passkey) has_totp = Authenticator.objects.filter(user=user, type=Authenticator.Type.TOTP).exists() has_passkey = Authenticator.objects.filter(user=user, type=Authenticator.Type.WEBAUTHN).exists() if not has_totp and not has_passkey: return Response( {"detail": "Two-factor authentication is not enabled"}, status=status.HTTP_400_BAD_REQUEST, ) # Delete existing recovery codes first (so activate creates new ones) Authenticator.objects.filter( user=user, type=Authenticator.Type.RECOVERY_CODES ).delete() # Generate new recovery codes using allauth's RecoveryCodes API recovery_instance = RecoveryCodes.activate(user) codes = recovery_instance.get_unused_codes() # Log security event log_security_event( "recovery_codes_regenerated", request, user=user, metadata={"codes_generated": len(codes)}, ) # Send security notification email send_security_notification(user, "recovery_codes_regenerated", {"codes_generated": len(codes)}) return Response( { "success": True, "recovery_codes": codes, "recovery_codes_count": len(codes), } )