Files
thrillwiki_django_no_react/backend/apps/api/v1/auth/mfa.py

396 lines
12 KiB
Python

"""
MFA (Multi-Factor Authentication) API Views
Provides REST API endpoints for MFA operations using django-allauth's mfa module.
Supports TOTP (Time-based One-Time Password) authentication.
"""
import base64
from io import BytesIO
from django.conf import settings
from drf_spectacular.utils import extend_schema
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
try:
import qrcode
HAS_QRCODE = True
except ImportError:
HAS_QRCODE = False
@extend_schema(
operation_id="get_mfa_status",
summary="Get MFA status for current user",
description="Returns whether MFA is enabled and what methods are configured.",
responses={
200: {
"description": "MFA status",
"example": {
"mfa_enabled": True,
"totp_enabled": True,
"recovery_codes_count": 10,
},
},
},
tags=["MFA"],
)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def get_mfa_status(request):
"""Get MFA status for current user."""
from allauth.mfa.models import Authenticator
user = request.user
authenticators = Authenticator.objects.filter(user=user)
totp_enabled = authenticators.filter(type=Authenticator.Type.TOTP).exists()
recovery_enabled = authenticators.filter(type=Authenticator.Type.RECOVERY_CODES).exists()
# Check for WebAuthn/Passkey authenticators
passkey_enabled = authenticators.filter(type=Authenticator.Type.WEBAUTHN).exists()
passkey_count = authenticators.filter(type=Authenticator.Type.WEBAUTHN).count()
# Count recovery codes if any
recovery_count = 0
if recovery_enabled:
try:
recovery_auth = authenticators.get(type=Authenticator.Type.RECOVERY_CODES)
recovery_count = len(recovery_auth.data.get("codes", []))
except Authenticator.DoesNotExist:
pass
# has_second_factor is True if user has either TOTP or Passkey configured
has_second_factor = totp_enabled or passkey_enabled
return Response(
{
"mfa_enabled": totp_enabled, # Backward compatibility
"totp_enabled": totp_enabled,
"passkey_enabled": passkey_enabled,
"passkey_count": passkey_count,
"recovery_codes_enabled": recovery_enabled,
"recovery_codes_count": recovery_count,
"has_second_factor": has_second_factor,
}
)
@extend_schema(
operation_id="setup_totp",
summary="Initialize TOTP setup",
description="Generates a new TOTP secret and returns the QR code for scanning.",
responses={
200: {
"description": "TOTP setup data",
"example": {
"secret": "ABCDEFGHIJKLMNOP",
"provisioning_uri": "otpauth://totp/ThrillWiki:user@example.com?secret=...",
"qr_code_base64": "data:image/png;base64,...",
},
},
},
tags=["MFA"],
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def setup_totp(request):
"""Generate TOTP secret and QR code for setup."""
from allauth.mfa.totp.internal import auth as totp_auth
user = request.user
# Generate TOTP secret
secret = totp_auth.get_totp_secret(None) # Generate new secret
# Build provisioning URI
issuer = getattr(settings, "MFA_TOTP_ISSUER", "ThrillWiki")
account_name = user.email or user.username
uri = f"otpauth://totp/{issuer}:{account_name}?secret={secret}&issuer={issuer}"
# Generate QR code if qrcode library is available
qr_code_base64 = None
if HAS_QRCODE:
qr = qrcode.make(uri)
buffer = BytesIO()
qr.save(buffer, format="PNG")
qr_code_base64 = f"data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode()}"
# Store secret in session for later verification
request.session["pending_totp_secret"] = secret
return Response(
{
"secret": secret,
"provisioning_uri": uri,
"qr_code_base64": qr_code_base64,
}
)
@extend_schema(
operation_id="activate_totp",
summary="Activate TOTP with verification code",
description="Verifies the TOTP code and activates 2FA for the user.",
request={
"application/json": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "6-digit TOTP code from authenticator app",
"example": "123456",
}
},
"required": ["code"],
}
},
responses={
200: {
"description": "TOTP activated successfully",
"example": {
"detail": "Two-factor authentication enabled",
"recovery_codes": ["ABCD1234", "EFGH5678"],
},
},
400: {"description": "Invalid code or missing setup data"},
},
tags=["MFA"],
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def activate_totp(request):
"""Verify TOTP code and activate MFA."""
from allauth.mfa.models import Authenticator
from allauth.mfa.recovery_codes.internal.auth import RecoveryCodes
from allauth.mfa.totp.internal import auth as totp_auth
user = request.user
code = request.data.get("code", "").strip()
if not code:
return Response(
{"detail": "Verification code is required"},
status=status.HTTP_400_BAD_REQUEST,
)
# Get pending secret from session OR from request body
# (request body is used as fallback for JWT auth where sessions may not persist)
secret = request.session.get("pending_totp_secret") or request.data.get("secret", "").strip()
if not secret:
return Response(
{"detail": "No pending TOTP setup. Please start setup again."},
status=status.HTTP_400_BAD_REQUEST,
)
# Verify the code
if not totp_auth.validate_totp_code(secret, code):
return Response(
{"detail": "Invalid verification code"},
status=status.HTTP_400_BAD_REQUEST,
)
# Check if already has TOTP
if Authenticator.objects.filter(user=user, type=Authenticator.Type.TOTP).exists():
return Response(
{"detail": "TOTP is already enabled"},
status=status.HTTP_400_BAD_REQUEST,
)
# Create TOTP authenticator
Authenticator.objects.create(
user=user,
type=Authenticator.Type.TOTP,
data={"secret": secret},
)
# Generate recovery codes using allauth's RecoveryCodes API
recovery_instance = RecoveryCodes.activate(user)
codes = recovery_instance.get_unused_codes()
# Clear session (only if it exists - won't exist with JWT auth + secret from body)
if "pending_totp_secret" in request.session:
del request.session["pending_totp_secret"]
return Response(
{
"detail": "Two-factor authentication enabled",
"recovery_codes": codes,
}
)
@extend_schema(
operation_id="deactivate_totp",
summary="Disable TOTP authentication",
description="Removes TOTP from the user's account after password verification.",
request={
"application/json": {
"type": "object",
"properties": {
"password": {
"type": "string",
"description": "Current password for confirmation",
}
},
"required": ["password"],
}
},
responses={
200: {
"description": "TOTP disabled",
"example": {"detail": "Two-factor authentication disabled"},
},
400: {"description": "Invalid password or MFA not enabled"},
},
tags=["MFA"],
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def deactivate_totp(request):
"""Disable TOTP authentication."""
from allauth.mfa.models import Authenticator
user = request.user
password = request.data.get("password", "")
# Verify password
if not user.check_password(password):
return Response(
{"detail": "Invalid password"},
status=status.HTTP_400_BAD_REQUEST,
)
# Remove TOTP and recovery codes
deleted_count, _ = Authenticator.objects.filter(
user=user, type__in=[Authenticator.Type.TOTP, Authenticator.Type.RECOVERY_CODES]
).delete()
if deleted_count == 0:
return Response(
{"detail": "Two-factor authentication is not enabled"},
status=status.HTTP_400_BAD_REQUEST,
)
return Response(
{
"detail": "Two-factor authentication disabled",
}
)
@extend_schema(
operation_id="verify_totp",
summary="Verify TOTP code during login",
description="Verifies the TOTP code as part of the login process.",
request={
"application/json": {
"type": "object",
"properties": {"code": {"type": "string", "description": "6-digit TOTP code"}},
"required": ["code"],
}
},
responses={
200: {"description": "Code verified", "example": {"success": True}},
400: {"description": "Invalid code"},
},
tags=["MFA"],
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def verify_totp(request):
"""Verify TOTP code."""
from allauth.mfa.models import Authenticator
from allauth.mfa.totp.internal import auth as totp_auth
user = request.user
code = request.data.get("code", "").strip()
if not code:
return Response(
{"detail": "Verification code is required"},
status=status.HTTP_400_BAD_REQUEST,
)
try:
authenticator = Authenticator.objects.get(user=user, type=Authenticator.Type.TOTP)
secret = authenticator.data.get("secret")
if totp_auth.validate_totp_code(secret, code):
return Response({"success": True})
else:
return Response(
{"detail": "Invalid verification code"},
status=status.HTTP_400_BAD_REQUEST,
)
except Authenticator.DoesNotExist:
return Response(
{"detail": "TOTP is not enabled"},
status=status.HTTP_400_BAD_REQUEST,
)
@extend_schema(
operation_id="regenerate_recovery_codes",
summary="Regenerate recovery codes",
description="Generates new recovery codes (invalidates old ones).",
request={
"application/json": {
"type": "object",
"properties": {"password": {"type": "string", "description": "Current password"}},
"required": ["password"],
}
},
responses={
200: {
"description": "New recovery codes",
"example": {"success": True, "recovery_codes": ["ABCD1234", "EFGH5678"]},
},
400: {"description": "Invalid password or MFA not enabled"},
},
tags=["MFA"],
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def regenerate_recovery_codes(request):
"""Regenerate recovery codes."""
from allauth.mfa.models import Authenticator
from allauth.mfa.recovery_codes.internal.auth import RecoveryCodes
user = request.user
password = request.data.get("password", "")
# Verify password
if not user.check_password(password):
return Response(
{"detail": "Invalid password"},
status=status.HTTP_400_BAD_REQUEST,
)
# Check if TOTP is enabled
if not Authenticator.objects.filter(user=user, type=Authenticator.Type.TOTP).exists():
return Response(
{"detail": "Two-factor authentication is not enabled"},
status=status.HTTP_400_BAD_REQUEST,
)
# Delete existing recovery codes first (so activate creates new ones)
Authenticator.objects.filter(
user=user, type=Authenticator.Type.RECOVERY_CODES
).delete()
# Generate new recovery codes using allauth's RecoveryCodes API
recovery_instance = RecoveryCodes.activate(user)
codes = recovery_instance.get_unused_codes()
return Response(
{
"success": True,
"recovery_codes": codes,
}
)