feat: Implement passkey authentication, account management features, and a dedicated MFA login verification flow.

This commit is contained in:
pacnpal
2026-01-06 10:08:44 -05:00
parent b80654952d
commit 4da7e52fb0
14 changed files with 1566 additions and 20 deletions

View File

@@ -19,7 +19,7 @@ from django.db import transaction
from django.db.models import Count, Q from django.db.models import Count, Q
from django.utils import timezone from django.utils import timezone
from rest_framework import status from rest_framework import status
from rest_framework.permissions import IsAdminUser from apps.core.permissions import IsAdminWithSecondFactor
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework.views import APIView from rest_framework.views import APIView
@@ -35,7 +35,7 @@ class OSMUsageStatsView(APIView):
Return OSM cache statistics for admin dashboard. Return OSM cache statistics for admin dashboard.
""" """
permission_classes = [IsAdminUser] permission_classes = [IsAdminWithSecondFactor]
def get(self, request): def get(self, request):
"""Return OSM/location cache usage statistics.""" """Return OSM/location cache usage statistics."""
@@ -128,7 +128,7 @@ class RateLimitMetricsView(APIView):
Return rate limiting metrics for admin dashboard. Return rate limiting metrics for admin dashboard.
""" """
permission_classes = [IsAdminUser] permission_classes = [IsAdminWithSecondFactor]
def post(self, request): def post(self, request):
"""Return rate limit metrics based on action.""" """Return rate limit metrics based on action."""
@@ -200,7 +200,7 @@ class DatabaseManagerView(APIView):
Handle admin CRUD operations for entities. Handle admin CRUD operations for entities.
""" """
permission_classes = [IsAdminUser] permission_classes = [IsAdminWithSecondFactor]
# Map entity types to Django models # Map entity types to Django models
ENTITY_MODEL_MAP = { ENTITY_MODEL_MAP = {
@@ -627,7 +627,7 @@ class CeleryTaskStatusView(APIView):
Return Celery task status (read-only). Return Celery task status (read-only).
""" """
permission_classes = [IsAdminUser] permission_classes = [IsAdminWithSecondFactor]
# List of known scheduled tasks # List of known scheduled tasks
SCHEDULED_TASKS = [ SCHEDULED_TASKS = [
@@ -734,7 +734,7 @@ class DetectAnomaliesView(APIView):
TODO: Implement full ML algorithms with numpy/scipy in follow-up task. TODO: Implement full ML algorithms with numpy/scipy in follow-up task.
""" """
permission_classes = [IsAdminUser] permission_classes = [IsAdminWithSecondFactor]
# Severity score thresholds # Severity score thresholds
SEVERITY_THRESHOLDS = { SEVERITY_THRESHOLDS = {
@@ -932,7 +932,7 @@ class CollectMetricsView(APIView):
BULLETPROOFED: Safe input parsing with validation. BULLETPROOFED: Safe input parsing with validation.
""" """
permission_classes = [IsAdminUser] permission_classes = [IsAdminWithSecondFactor]
# Allowed values # Allowed values
ALLOWED_METRIC_TYPES = {"all", "database", "users", "moderation", "performance"} ALLOWED_METRIC_TYPES = {"all", "database", "users", "moderation", "performance"}
@@ -1043,7 +1043,7 @@ class PipelineIntegrityScanView(APIView):
BULLETPROOFED: Safe input parsing with validation. BULLETPROOFED: Safe input parsing with validation.
""" """
permission_classes = [IsAdminUser] permission_classes = [IsAdminWithSecondFactor]
# Allowed values # Allowed values
ALLOWED_SCAN_TYPES = {"full", "referential", "status", "media", "submissions", "stuck", "versions"} ALLOWED_SCAN_TYPES = {"full", "referential", "status", "media", "submissions", "stuck", "versions"}

View File

@@ -0,0 +1,418 @@
"""
Account Management Views for ThrillWiki API v1.
Handles email changes, account deletion, and session management.
"""
import logging
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.utils import timezone
from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
logger = logging.getLogger(__name__)
UserModel = get_user_model()
# ============== EMAIL CHANGE ENDPOINTS ==============
@extend_schema(
operation_id="request_email_change",
summary="Request email change",
description="Initiates an email change request. Sends verification to new email.",
request={
"application/json": {
"type": "object",
"properties": {
"new_email": {"type": "string", "format": "email"},
"password": {"type": "string", "description": "Current password for verification"},
},
"required": ["new_email", "password"],
}
},
responses={
200: {"description": "Email change requested"},
400: {"description": "Invalid request"},
},
tags=["Account"],
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def request_email_change(request):
"""Request to change email address."""
user = request.user
new_email = request.data.get("new_email", "").strip().lower()
password = request.data.get("password", "")
if not new_email:
return Response(
{"detail": "New email is required"},
status=status.HTTP_400_BAD_REQUEST,
)
if not user.check_password(password):
return Response(
{"detail": "Invalid password"},
status=status.HTTP_400_BAD_REQUEST,
)
# Check if email already in use
if UserModel.objects.filter(email=new_email).exclude(pk=user.pk).exists():
return Response(
{"detail": "This email is already in use"},
status=status.HTTP_400_BAD_REQUEST,
)
# Store pending email change in cache
cache_key = f"email_change:{user.pk}"
cache.set(
cache_key,
{
"new_email": new_email,
"requested_at": timezone.now().isoformat(),
},
timeout=86400, # 24 hours
)
# TODO: Send verification email to new_email
# For now, just store the pending change
return Response({
"detail": "Email change requested. Please check your new email for verification.",
"new_email": new_email,
})
@extend_schema(
operation_id="get_email_change_status",
summary="Get pending email change status",
responses={
200: {
"description": "Email change status",
"example": {
"has_pending_change": True,
"new_email": "new@example.com",
"requested_at": "2026-01-06T12:00:00Z",
},
},
},
tags=["Account"],
)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def get_email_change_status(request):
"""Get status of pending email change."""
user = request.user
cache_key = f"email_change:{user.pk}"
pending = cache.get(cache_key)
if not pending:
return Response({
"has_pending_change": False,
"new_email": None,
"requested_at": None,
})
return Response({
"has_pending_change": True,
"new_email": pending.get("new_email"),
"requested_at": pending.get("requested_at"),
})
@extend_schema(
operation_id="cancel_email_change",
summary="Cancel pending email change",
responses={
200: {"description": "Email change cancelled"},
},
tags=["Account"],
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def cancel_email_change(request):
"""Cancel a pending email change request."""
user = request.user
cache_key = f"email_change:{user.pk}"
cache.delete(cache_key)
return Response({"detail": "Email change cancelled"})
# ============== ACCOUNT DELETION ENDPOINTS ==============
@extend_schema(
operation_id="request_account_deletion",
summary="Request account deletion",
description="Initiates account deletion. Requires password confirmation.",
request={
"application/json": {
"type": "object",
"properties": {
"password": {"type": "string"},
"reason": {"type": "string", "description": "Optional reason for leaving"},
},
"required": ["password"],
}
},
responses={
200: {"description": "Deletion requested"},
400: {"description": "Invalid password"},
},
tags=["Account"],
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def request_account_deletion(request):
"""Request account deletion."""
user = request.user
password = request.data.get("password", "")
reason = request.data.get("reason", "")
if not user.check_password(password):
return Response(
{"detail": "Invalid password"},
status=status.HTTP_400_BAD_REQUEST,
)
# Store deletion request in cache (will be processed by background task)
cache_key = f"account_deletion:{user.pk}"
deletion_date = timezone.now() + timezone.timedelta(days=30)
cache.set(
cache_key,
{
"requested_at": timezone.now().isoformat(),
"scheduled_deletion": deletion_date.isoformat(),
"reason": reason,
},
timeout=2592000, # 30 days
)
# Also update user profile if it exists
try:
from apps.accounts.models import Profile
profile = Profile.objects.filter(user=user).first()
if profile:
profile.deletion_requested_at = timezone.now()
profile.scheduled_deletion_date = deletion_date
profile.save(update_fields=["deletion_requested_at", "scheduled_deletion_date"])
except Exception as e:
logger.warning(f"Could not update profile for deletion: {e}")
return Response({
"detail": "Account deletion scheduled",
"scheduled_deletion": deletion_date.isoformat(),
})
@extend_schema(
operation_id="get_deletion_status",
summary="Get account deletion status",
responses={
200: {
"description": "Deletion status",
"example": {
"deletion_pending": True,
"scheduled_deletion": "2026-02-06T12:00:00Z",
},
},
},
tags=["Account"],
)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def get_deletion_status(request):
"""Get status of pending account deletion."""
user = request.user
cache_key = f"account_deletion:{user.pk}"
pending = cache.get(cache_key)
if not pending:
# Also check profile
try:
from apps.accounts.models import Profile
profile = Profile.objects.filter(user=user).first()
if profile and profile.deletion_requested_at:
return Response({
"deletion_pending": True,
"requested_at": profile.deletion_requested_at.isoformat(),
"scheduled_deletion": profile.scheduled_deletion_date.isoformat() if profile.scheduled_deletion_date else None,
})
except Exception:
pass
return Response({
"deletion_pending": False,
"scheduled_deletion": None,
})
return Response({
"deletion_pending": True,
"requested_at": pending.get("requested_at"),
"scheduled_deletion": pending.get("scheduled_deletion"),
})
@extend_schema(
operation_id="cancel_account_deletion",
summary="Cancel account deletion",
responses={
200: {"description": "Deletion cancelled"},
},
tags=["Account"],
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def cancel_account_deletion(request):
"""Cancel a pending account deletion request."""
user = request.user
cache_key = f"account_deletion:{user.pk}"
cache.delete(cache_key)
# Also clear from profile
try:
from apps.accounts.models import Profile
Profile.objects.filter(user=user).update(
deletion_requested_at=None,
scheduled_deletion_date=None,
)
except Exception as e:
logger.warning(f"Could not clear deletion from profile: {e}")
return Response({"detail": "Account deletion cancelled"})
# ============== SESSION MANAGEMENT ENDPOINTS ==============
@extend_schema(
operation_id="list_sessions",
summary="List active sessions",
description="Returns list of active sessions for the current user.",
responses={
200: {
"description": "List of sessions",
"example": {
"sessions": [
{
"id": "session_123",
"created_at": "2026-01-06T12:00:00Z",
"last_activity": "2026-01-06T14:00:00Z",
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0...",
"is_current": True,
}
]
},
},
},
tags=["Account"],
)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def list_sessions(request):
"""List all active sessions for the user."""
# For JWT-based auth, we track sessions differently
# This is a simplified implementation - in production you'd track tokens
# For now, return the current session info
current_session = {
"id": "current",
"created_at": timezone.now().isoformat(),
"last_activity": timezone.now().isoformat(),
"ip_address": request.META.get("REMOTE_ADDR", "unknown"),
"user_agent": request.META.get("HTTP_USER_AGENT", "unknown"),
"is_current": True,
}
return Response({
"sessions": [current_session],
"count": 1,
})
@extend_schema(
operation_id="revoke_session",
summary="Revoke a session",
description="Revokes a specific session. If revoking current session, user will be logged out.",
responses={
200: {"description": "Session revoked"},
404: {"description": "Session not found"},
},
tags=["Account"],
)
@api_view(["DELETE"])
@permission_classes([IsAuthenticated])
def revoke_session(request, session_id):
"""Revoke a specific session."""
# For JWT auth, we'd need to implement token blacklisting
# This is a placeholder that returns success
if session_id == "current":
# Blacklist the current refresh token if using SimpleJWT
try:
from rest_framework_simplejwt.token_blacklist.models import BlacklistedToken
from rest_framework_simplejwt.tokens import RefreshToken
# Get refresh token from request if available
refresh_token = request.data.get("refresh_token")
if refresh_token:
token = RefreshToken(refresh_token)
token.blacklist()
except Exception as e:
logger.warning(f"Could not blacklist token: {e}")
return Response({"detail": "Session revoked"})
# ============== PASSWORD CHANGE ENDPOINT ==============
@extend_schema(
operation_id="change_password",
summary="Change password",
description="Changes the user's password. Requires current password.",
request={
"application/json": {
"type": "object",
"properties": {
"current_password": {"type": "string"},
"new_password": {"type": "string"},
},
"required": ["current_password", "new_password"],
}
},
responses={
200: {"description": "Password changed"},
400: {"description": "Invalid current password or weak new password"},
},
tags=["Account"],
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def change_password(request):
"""Change user password."""
user = request.user
current_password = request.data.get("current_password", "")
new_password = request.data.get("new_password", "")
if not user.check_password(current_password):
return Response(
{"detail": "Current password is incorrect"},
status=status.HTTP_400_BAD_REQUEST,
)
if len(new_password) < 8:
return Response(
{"detail": "New password must be at least 8 characters"},
status=status.HTTP_400_BAD_REQUEST,
)
user.set_password(new_password)
user.save()
return Response({"detail": "Password changed successfully"})

View File

@@ -50,6 +50,10 @@ def get_mfa_status(request):
totp_enabled = authenticators.filter(type=Authenticator.Type.TOTP).exists() totp_enabled = authenticators.filter(type=Authenticator.Type.TOTP).exists()
recovery_enabled = authenticators.filter(type=Authenticator.Type.RECOVERY_CODES).exists() recovery_enabled = authenticators.filter(type=Authenticator.Type.RECOVERY_CODES).exists()
# Check for WebAuthn/Passkey authenticators
passkey_enabled = authenticators.filter(type=Authenticator.Type.WEBAUTHN).exists()
passkey_count = authenticators.filter(type=Authenticator.Type.WEBAUTHN).count()
# Count recovery codes if any # Count recovery codes if any
recovery_count = 0 recovery_count = 0
@@ -60,12 +64,18 @@ def get_mfa_status(request):
except Authenticator.DoesNotExist: except Authenticator.DoesNotExist:
pass pass
# has_second_factor is True if user has either TOTP or Passkey configured
has_second_factor = totp_enabled or passkey_enabled
return Response( return Response(
{ {
"mfa_enabled": totp_enabled, "mfa_enabled": totp_enabled, # Backward compatibility
"totp_enabled": totp_enabled, "totp_enabled": totp_enabled,
"passkey_enabled": passkey_enabled,
"passkey_count": passkey_count,
"recovery_codes_enabled": recovery_enabled, "recovery_codes_enabled": recovery_enabled,
"recovery_codes_count": recovery_count, "recovery_codes_count": recovery_count,
"has_second_factor": has_second_factor,
} }
) )

View File

@@ -0,0 +1,536 @@
"""
Passkey (WebAuthn) API Views
Provides REST API endpoints for WebAuthn/Passkey operations using django-allauth's
mfa.webauthn module. Supports passkey registration, authentication, and management.
"""
import logging
from drf_spectacular.utils import extend_schema
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
logger = logging.getLogger(__name__)
@extend_schema(
operation_id="get_passkey_status",
summary="Get passkey status for current user",
description="Returns whether passkeys are enabled and lists registered passkeys.",
responses={
200: {
"description": "Passkey status",
"example": {
"passkey_enabled": True,
"passkeys": [
{"id": "abc123", "name": "MacBook Pro", "created_at": "2026-01-06T12:00:00Z"}
],
},
},
},
tags=["Passkey"],
)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def get_passkey_status(request):
"""Get passkey status for current user."""
try:
from allauth.mfa.models import Authenticator
user = request.user
passkeys = Authenticator.objects.filter(
user=user, type=Authenticator.Type.WEBAUTHN
)
passkey_list = []
for pk in passkeys:
passkey_data = pk.data or {}
passkey_list.append({
"id": str(pk.id),
"name": passkey_data.get("name", "Passkey"),
"created_at": pk.created_at.isoformat() if hasattr(pk, "created_at") else None,
})
return Response({
"passkey_enabled": passkeys.exists(),
"passkey_count": passkeys.count(),
"passkeys": passkey_list,
})
except ImportError:
return Response({
"passkey_enabled": False,
"passkey_count": 0,
"passkeys": [],
"error": "WebAuthn module not available",
})
except Exception as e:
logger.error(f"Error getting passkey status: {e}")
return Response(
{"detail": "Failed to get passkey status"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema(
operation_id="get_registration_options",
summary="Get WebAuthn registration options",
description="Returns options for registering a new passkey. Start the registration flow.",
responses={
200: {
"description": "WebAuthn registration options",
"example": {
"options": {"challenge": "...", "rp": {"name": "ThrillWiki"}},
},
},
},
tags=["Passkey"],
)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def get_registration_options(request):
"""Get WebAuthn registration options for passkey setup."""
try:
from allauth.mfa.webauthn.internal import auth as webauthn_auth
# Use the correct allauth API: begin_registration
creation_options, state = webauthn_auth.begin_registration(request)
# Store state in session for verification
webauthn_auth.set_state(request, state)
return Response({
"options": creation_options,
})
except ImportError as e:
logger.error(f"WebAuthn module import error: {e}")
return Response(
{"detail": "WebAuthn module not available"},
status=status.HTTP_400_BAD_REQUEST,
)
except Exception as e:
logger.error(f"Error getting registration options: {e}")
return Response(
{"detail": f"Failed to get registration options: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema(
operation_id="register_passkey",
summary="Complete passkey registration",
description="Verifies the WebAuthn response and registers the new passkey.",
request={
"application/json": {
"type": "object",
"properties": {
"credential": {"type": "object", "description": "WebAuthn credential response"},
"name": {"type": "string", "description": "Name for this passkey"},
},
"required": ["credential"],
}
},
responses={
200: {"description": "Passkey registered successfully"},
400: {"description": "Invalid credential or registration failed"},
},
tags=["Passkey"],
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def register_passkey(request):
"""Complete passkey registration with WebAuthn response."""
try:
from allauth.mfa.webauthn.internal import auth as webauthn_auth
credential = request.data.get("credential")
name = request.data.get("name", "Passkey")
if not credential:
return Response(
{"detail": "Credential is required"},
status=status.HTTP_400_BAD_REQUEST,
)
# Get stored state from session
state = webauthn_auth.get_state(request)
if not state:
return Response(
{"detail": "No pending registration. Please start registration again."},
status=status.HTTP_400_BAD_REQUEST,
)
# Use the correct allauth API: complete_registration
try:
# Parse the credential response
credential_data = webauthn_auth.parse_registration_response(credential)
# Complete registration - this creates the Authenticator
authenticator = webauthn_auth.complete_registration(
request,
credential_data,
state,
name=name,
)
# Clear session state
webauthn_auth.clear_state(request)
return Response({
"detail": "Passkey registered successfully",
"name": name,
"id": str(authenticator.id) if authenticator else None,
})
except Exception as e:
logger.error(f"WebAuthn registration failed: {e}")
return Response(
{"detail": f"Registration failed: {str(e)}"},
status=status.HTTP_400_BAD_REQUEST,
)
except ImportError as e:
logger.error(f"WebAuthn module import error: {e}")
return Response(
{"detail": "WebAuthn module not available"},
status=status.HTTP_400_BAD_REQUEST,
)
except Exception as e:
logger.error(f"Error registering passkey: {e}")
return Response(
{"detail": f"Failed to register passkey: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema(
operation_id="get_authentication_options",
summary="Get WebAuthn authentication options",
description="Returns options for authenticating with a passkey.",
responses={
200: {
"description": "WebAuthn authentication options",
"example": {
"options": {"challenge": "...", "allowCredentials": []},
},
},
},
tags=["Passkey"],
)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def get_authentication_options(request):
"""Get WebAuthn authentication options for passkey verification."""
try:
from allauth.mfa.webauthn.internal import auth as webauthn_auth
# Use the correct allauth API: begin_authentication
request_options, state = webauthn_auth.begin_authentication(request)
# Store state in session for verification
webauthn_auth.set_state(request, state)
return Response({
"options": request_options,
})
except ImportError as e:
logger.error(f"WebAuthn module import error: {e}")
return Response(
{"detail": "WebAuthn module not available"},
status=status.HTTP_400_BAD_REQUEST,
)
except Exception as e:
logger.error(f"Error getting authentication options: {e}")
return Response(
{"detail": f"Failed to get authentication options: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema(
operation_id="authenticate_passkey",
summary="Authenticate with passkey",
description="Verifies the WebAuthn response for authentication.",
request={
"application/json": {
"type": "object",
"properties": {
"credential": {"type": "object", "description": "WebAuthn credential response"},
},
"required": ["credential"],
}
},
responses={
200: {"description": "Authentication successful"},
400: {"description": "Invalid credential or authentication failed"},
},
tags=["Passkey"],
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def authenticate_passkey(request):
"""Verify passkey authentication."""
try:
from allauth.mfa.webauthn.internal import auth as webauthn_auth
credential = request.data.get("credential")
if not credential:
return Response(
{"detail": "Credential is required"},
status=status.HTTP_400_BAD_REQUEST,
)
# Get stored state from session
state = webauthn_auth.get_state(request)
if not state:
return Response(
{"detail": "No pending authentication. Please start authentication again."},
status=status.HTTP_400_BAD_REQUEST,
)
# Use the correct allauth API: complete_authentication
try:
# Parse the credential response
credential_data = webauthn_auth.parse_authentication_response(credential)
# Complete authentication
webauthn_auth.complete_authentication(request, credential_data, state)
# Clear session state
webauthn_auth.clear_state(request)
return Response({"success": True})
except Exception as e:
logger.error(f"WebAuthn authentication failed: {e}")
return Response(
{"detail": f"Authentication failed: {str(e)}"},
status=status.HTTP_400_BAD_REQUEST,
)
except ImportError as e:
logger.error(f"WebAuthn module import error: {e}")
return Response(
{"detail": "WebAuthn module not available"},
status=status.HTTP_400_BAD_REQUEST,
)
except Exception as e:
logger.error(f"Error authenticating passkey: {e}")
return Response(
{"detail": f"Failed to authenticate: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema(
operation_id="delete_passkey",
summary="Delete a passkey",
description="Removes a registered passkey from the user's account.",
request={
"application/json": {
"type": "object",
"properties": {
"password": {"type": "string", "description": "Current password for confirmation"},
},
"required": ["password"],
}
},
responses={
200: {"description": "Passkey deleted successfully"},
400: {"description": "Invalid password or passkey not found"},
},
tags=["Passkey"],
)
@api_view(["DELETE"])
@permission_classes([IsAuthenticated])
def delete_passkey(request, passkey_id):
"""Delete a passkey."""
try:
from allauth.mfa.models import Authenticator
user = request.user
password = request.data.get("password", "")
# Verify password
if not user.check_password(password):
return Response(
{"detail": "Invalid password"},
status=status.HTTP_400_BAD_REQUEST,
)
# Find and delete the passkey
try:
authenticator = Authenticator.objects.get(
id=passkey_id,
user=user,
type=Authenticator.Type.WEBAUTHN,
)
authenticator.delete()
except Authenticator.DoesNotExist:
return Response(
{"detail": "Passkey not found"},
status=status.HTTP_404_NOT_FOUND,
)
return Response({"detail": "Passkey deleted successfully"})
except ImportError:
return Response(
{"detail": "WebAuthn module not available"},
status=status.HTTP_400_BAD_REQUEST,
)
except Exception as e:
logger.error(f"Error deleting passkey: {e}")
return Response(
{"detail": f"Failed to delete passkey: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema(
operation_id="rename_passkey",
summary="Rename a passkey",
description="Updates the name of a registered passkey.",
request={
"application/json": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "New name for the passkey"},
},
"required": ["name"],
}
},
responses={
200: {"description": "Passkey renamed successfully"},
404: {"description": "Passkey not found"},
},
tags=["Passkey"],
)
@api_view(["PATCH"])
@permission_classes([IsAuthenticated])
def rename_passkey(request, passkey_id):
"""Rename a passkey."""
try:
from allauth.mfa.models import Authenticator
user = request.user
new_name = request.data.get("name", "").strip()
if not new_name:
return Response(
{"detail": "Name is required"},
status=status.HTTP_400_BAD_REQUEST,
)
try:
authenticator = Authenticator.objects.get(
id=passkey_id, user=user, type=Authenticator.Type.WEBAUTHN,
)
data = authenticator.data or {}
data["name"] = new_name
authenticator.data = data
authenticator.save()
except Authenticator.DoesNotExist:
return Response(
{"detail": "Passkey not found"},
status=status.HTTP_404_NOT_FOUND,
)
return Response({"detail": "Passkey renamed successfully", "name": new_name})
except ImportError:
return Response(
{"detail": "WebAuthn module not available"},
status=status.HTTP_400_BAD_REQUEST,
)
except Exception as e:
logger.error(f"Error renaming passkey: {e}")
return Response(
{"detail": f"Failed to rename passkey: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@extend_schema(
operation_id="get_login_passkey_options",
summary="Get WebAuthn options for MFA login",
description="Returns passkey auth options using MFA token (unauthenticated).",
request={
"application/json": {
"type": "object",
"properties": {
"mfa_token": {"type": "string", "description": "MFA token from login"},
},
"required": ["mfa_token"],
}
},
responses={
200: {"description": "WebAuthn authentication options"},
400: {"description": "Invalid or expired MFA token"},
},
tags=["Passkey"],
)
@api_view(["POST"])
def get_login_passkey_options(request):
"""Get WebAuthn authentication options for MFA login flow (unauthenticated)."""
from django.core.cache import cache
from django.contrib.auth import get_user_model
User = get_user_model()
mfa_token = request.data.get("mfa_token")
if not mfa_token:
return Response(
{"detail": "MFA token is required"}, status=status.HTTP_400_BAD_REQUEST
)
cache_key = f"mfa_login:{mfa_token}"
cached_data = cache.get(cache_key)
if not cached_data:
return Response(
{"detail": "MFA session expired or invalid"},
status=status.HTTP_400_BAD_REQUEST,
)
user_id = cached_data.get("user_id")
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
return Response({"detail": "User not found"}, status=status.HTTP_400_BAD_REQUEST)
try:
from allauth.mfa.models import Authenticator
from allauth.mfa.webauthn.internal import auth as webauthn_auth
passkeys = Authenticator.objects.filter(
user=user, type=Authenticator.Type.WEBAUTHN
)
if not passkeys.exists():
return Response(
{"detail": "No passkeys registered"}, status=status.HTTP_400_BAD_REQUEST
)
original_user = getattr(request, "user", None)
request.user = user
try:
request_options, state = webauthn_auth.begin_authentication(request)
passkey_state_key = f"mfa_passkey_state:{mfa_token}"
cache.set(passkey_state_key, state, timeout=300)
return Response({"options": request_options})
finally:
if original_user is not None:
request.user = original_user
except ImportError as e:
logger.error(f"WebAuthn module import error: {e}")
return Response(
{"detail": "WebAuthn module not available"},
status=status.HTTP_400_BAD_REQUEST,
)
except Exception as e:
logger.error(f"Error getting login passkey options: {e}")
return Response(
{"detail": f"Failed to get passkey options: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)

View File

@@ -105,19 +105,36 @@ class UserOutputSerializer(serializers.ModelSerializer):
class LoginInputSerializer(serializers.Serializer): class LoginInputSerializer(serializers.Serializer):
"""Input serializer for user login.""" """Input serializer for user login.
Accepts either 'email' or 'username' field for backward compatibility.
The view will use whichever is provided.
"""
username = serializers.CharField(max_length=254, help_text="Username or email address") # Accept both email and username - frontend sends "email", but we also support "username"
email = serializers.CharField(max_length=254, required=False, help_text="Email address")
username = serializers.CharField(max_length=254, required=False, help_text="Username (alternative to email)")
password = serializers.CharField(max_length=128, style={"input_type": "password"}, trim_whitespace=False) password = serializers.CharField(max_length=128, style={"input_type": "password"}, trim_whitespace=False)
def validate(self, attrs): def validate(self, attrs):
email = attrs.get("email")
username = attrs.get("username") username = attrs.get("username")
password = attrs.get("password") password = attrs.get("password")
if username and password: # Use email if provided, fallback to username
return attrs identifier = email or username
if not identifier:
raise serializers.ValidationError("Either email or username is required.")
if not password:
raise serializers.ValidationError("Password is required.")
# Store the identifier in a standard field for the view to consume
attrs["username"] = identifier
return attrs
raise serializers.ValidationError("Must include username/email and password.")
class LoginOutputSerializer(serializers.Serializer): class LoginOutputSerializer(serializers.Serializer):
@@ -129,6 +146,53 @@ class LoginOutputSerializer(serializers.Serializer):
message = serializers.CharField() message = serializers.CharField()
class MFARequiredOutputSerializer(serializers.Serializer):
"""Output serializer when MFA verification is required after password auth."""
mfa_required = serializers.BooleanField(default=True)
mfa_token = serializers.CharField(help_text="Temporary token for MFA verification")
mfa_types = serializers.ListField(
child=serializers.CharField(),
help_text="Available MFA types: 'totp', 'webauthn'",
)
user_id = serializers.IntegerField(help_text="User ID for reference")
message = serializers.CharField(default="MFA verification required")
class MFALoginVerifyInputSerializer(serializers.Serializer):
"""Input serializer for MFA login verification."""
mfa_token = serializers.CharField(help_text="Temporary MFA token from login response")
code = serializers.CharField(
max_length=6,
min_length=6,
required=False,
help_text="6-digit TOTP code from authenticator app",
)
# For passkey/webauthn - credential will be a complex object
credential = serializers.JSONField(required=False, help_text="WebAuthn credential response")
def validate(self, attrs):
code = attrs.get("code")
credential = attrs.get("credential")
if not code and not credential:
raise serializers.ValidationError(
"Either 'code' (TOTP) or 'credential' (passkey) is required."
)
return attrs
class MFALoginVerifyOutputSerializer(serializers.Serializer):
"""Output serializer for successful MFA verification."""
access = serializers.CharField()
refresh = serializers.CharField()
user = UserOutputSerializer()
message = serializers.CharField(default="Login successful")
class SignupInputSerializer(serializers.ModelSerializer): class SignupInputSerializer(serializers.ModelSerializer):
"""Input serializer for user registration.""" """Input serializer for user registration."""

View File

@@ -9,6 +9,8 @@ from django.urls import include, path
from rest_framework_simplejwt.views import TokenRefreshView from rest_framework_simplejwt.views import TokenRefreshView
from . import mfa as mfa_views from . import mfa as mfa_views
from . import passkey as passkey_views
from . import account_management as account_views
from .views import ( from .views import (
AuthStatusAPIView, AuthStatusAPIView,
# Social provider management views # Social provider management views
@@ -22,6 +24,7 @@ from .views import (
# Main auth views # Main auth views
LoginAPIView, LoginAPIView,
LogoutAPIView, LogoutAPIView,
MFALoginVerifyAPIView,
PasswordChangeAPIView, PasswordChangeAPIView,
PasswordResetAPIView, PasswordResetAPIView,
ProcessOAuthProfileAPIView, ProcessOAuthProfileAPIView,
@@ -34,6 +37,7 @@ from .views import (
urlpatterns = [ urlpatterns = [
# Core authentication endpoints # Core authentication endpoints
path("login/", LoginAPIView.as_view(), name="auth-login"), path("login/", LoginAPIView.as_view(), name="auth-login"),
path("login/mfa-verify/", MFALoginVerifyAPIView.as_view(), name="auth-login-mfa-verify"),
path("signup/", SignupAPIView.as_view(), name="auth-signup"), path("signup/", SignupAPIView.as_view(), name="auth-signup"),
path("logout/", LogoutAPIView.as_view(), name="auth-logout"), path("logout/", LogoutAPIView.as_view(), name="auth-logout"),
path("user/", CurrentUserAPIView.as_view(), name="auth-current-user"), path("user/", CurrentUserAPIView.as_view(), name="auth-current-user"),
@@ -105,6 +109,25 @@ urlpatterns = [
path("mfa/totp/deactivate/", mfa_views.deactivate_totp, name="auth-mfa-totp-deactivate"), path("mfa/totp/deactivate/", mfa_views.deactivate_totp, name="auth-mfa-totp-deactivate"),
path("mfa/totp/verify/", mfa_views.verify_totp, name="auth-mfa-totp-verify"), path("mfa/totp/verify/", mfa_views.verify_totp, name="auth-mfa-totp-verify"),
path("mfa/recovery-codes/regenerate/", mfa_views.regenerate_recovery_codes, name="auth-mfa-recovery-regenerate"), path("mfa/recovery-codes/regenerate/", mfa_views.regenerate_recovery_codes, name="auth-mfa-recovery-regenerate"),
# Passkey (WebAuthn) endpoints
path("passkey/status/", passkey_views.get_passkey_status, name="auth-passkey-status"),
path("passkey/registration-options/", passkey_views.get_registration_options, name="auth-passkey-registration-options"),
path("passkey/register/", passkey_views.register_passkey, name="auth-passkey-register"),
path("passkey/authentication-options/", passkey_views.get_authentication_options, name="auth-passkey-authentication-options"),
path("passkey/authenticate/", passkey_views.authenticate_passkey, name="auth-passkey-authenticate"),
path("passkey/<int:passkey_id>/", passkey_views.delete_passkey, name="auth-passkey-delete"),
path("passkey/<int:passkey_id>/rename/", passkey_views.rename_passkey, name="auth-passkey-rename"),
path("passkey/login-options/", passkey_views.get_login_passkey_options, name="auth-passkey-login-options"),
# Account management endpoints
path("email/change/", account_views.request_email_change, name="auth-email-change"),
path("email/change/status/", account_views.get_email_change_status, name="auth-email-change-status"),
path("email/change/cancel/", account_views.cancel_email_change, name="auth-email-change-cancel"),
path("account/delete/", account_views.request_account_deletion, name="auth-account-delete"),
path("account/delete/status/", account_views.get_deletion_status, name="auth-deletion-status"),
path("account/delete/cancel/", account_views.cancel_account_deletion, name="auth-deletion-cancel"),
path("sessions/", account_views.list_sessions, name="auth-sessions-list"),
path("sessions/<str:session_id>/", account_views.revoke_session, name="auth-session-revoke"),
path("password/change/", account_views.change_password, name="auth-password-change-v2"),
] ]
# Note: User profiles and top lists functionality is now handled by the accounts app # Note: User profiles and top lists functionality is now handled by the accounts app

View File

@@ -178,6 +178,37 @@ class LoginAPIView(APIView):
if user: if user:
if getattr(user, "is_active", False): if getattr(user, "is_active", False):
# Check if user has MFA enabled
mfa_info = self._check_user_mfa(user)
if mfa_info["has_mfa"]:
# MFA required - generate temp token and return mfa_required response
from django.utils.crypto import get_random_string
from django.core.cache import cache
# Generate secure temp token
mfa_token = get_random_string(64)
# Store user ID in cache with token (expires in 5 minutes)
cache_key = f"mfa_login:{mfa_token}"
cache.set(cache_key, {
"user_id": user.pk,
"username": user.username,
}, timeout=300) # 5 minutes
from .serializers import MFARequiredOutputSerializer
response_data = {
"mfa_required": True,
"mfa_token": mfa_token,
"mfa_types": mfa_info["mfa_types"],
"user_id": user.pk,
"message": "MFA verification required",
}
response_serializer = MFARequiredOutputSerializer(response_data)
return Response(response_serializer.data)
# No MFA - proceed with normal login
# pass a real HttpRequest to Django login with backend specified # pass a real HttpRequest to Django login with backend specified
login(_get_underlying_request(request), user, backend="django.contrib.auth.backends.ModelBackend") login(_get_underlying_request(request), user, backend="django.contrib.auth.backends.ModelBackend")
@@ -212,6 +243,207 @@ class LoginAPIView(APIView):
) )
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def _check_user_mfa(self, user) -> dict:
"""Check if user has MFA (TOTP or WebAuthn) configured."""
try:
from allauth.mfa.models import Authenticator
authenticators = Authenticator.objects.filter(user=user)
has_totp = authenticators.filter(type=Authenticator.Type.TOTP).exists()
has_webauthn = authenticators.filter(type=Authenticator.Type.WEBAUTHN).exists()
mfa_types = []
if has_totp:
mfa_types.append("totp")
if has_webauthn:
mfa_types.append("webauthn")
return {
"has_mfa": has_totp or has_webauthn,
"has_totp": has_totp,
"has_webauthn": has_webauthn,
"mfa_types": mfa_types,
}
except ImportError:
return {"has_mfa": False, "has_totp": False, "has_webauthn": False, "mfa_types": []}
except Exception:
return {"has_mfa": False, "has_totp": False, "has_webauthn": False, "mfa_types": []}
@extend_schema_view(
post=extend_schema(
summary="Verify MFA for login",
description="Complete MFA verification after password authentication. Submit TOTP code to receive JWT tokens.",
request={"application/json": {
"type": "object",
"properties": {
"mfa_token": {"type": "string", "description": "Temporary token from login response"},
"code": {"type": "string", "description": "6-digit TOTP code"},
},
"required": ["mfa_token", "code"],
}},
responses={
200: LoginOutputSerializer,
400: "Bad Request - Invalid code or expired token",
},
tags=["Authentication"],
),
)
class MFALoginVerifyAPIView(APIView):
"""API endpoint to verify MFA code and complete login."""
permission_classes = [AllowAny]
authentication_classes = []
def post(self, request: Request) -> Response:
from django.core.cache import cache
from .serializers import MFALoginVerifyInputSerializer
serializer = MFALoginVerifyInputSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
validated = serializer.validated_data
mfa_token = validated.get("mfa_token")
totp_code = validated.get("code")
credential = validated.get("credential") # WebAuthn/Passkey credential
# Retrieve user from cache
cache_key = f"mfa_login:{mfa_token}"
cached_data = cache.get(cache_key)
if not cached_data:
return Response(
{"detail": "MFA session expired or invalid. Please login again."},
status=status.HTTP_400_BAD_REQUEST,
)
user_id = cached_data.get("user_id")
try:
user = UserModel.objects.get(pk=user_id)
except UserModel.DoesNotExist:
return Response(
{"detail": "User not found"},
status=status.HTTP_400_BAD_REQUEST,
)
# Verify MFA - either TOTP or Passkey
if totp_code:
if not self._verify_totp(user, totp_code):
return Response(
{"detail": "Invalid verification code"},
status=status.HTTP_400_BAD_REQUEST,
)
elif credential:
# Verify passkey/WebAuthn credential
passkey_result = self._verify_passkey(request, user, credential)
if not passkey_result["success"]:
return Response(
{"detail": passkey_result.get("error", "Passkey verification failed")},
status=status.HTTP_400_BAD_REQUEST,
)
else:
return Response(
{"detail": "Either TOTP code or passkey credential is required"},
status=status.HTTP_400_BAD_REQUEST,
)
# Clear the MFA token from cache
cache.delete(cache_key)
# Complete login
login(_get_underlying_request(request), user, backend="django.contrib.auth.backends.ModelBackend")
# Generate JWT tokens
from rest_framework_simplejwt.tokens import RefreshToken
refresh = RefreshToken.for_user(user)
access_token = refresh.access_token
response_serializer = LoginOutputSerializer(
{
"access": str(access_token),
"refresh": str(refresh),
"user": user,
"message": "Login successful",
}
)
return Response(response_serializer.data)
def _verify_totp(self, user, code: str) -> bool:
"""Verify TOTP code against user's authenticator."""
try:
from allauth.mfa.models import Authenticator
from allauth.mfa.totp import TOTP
try:
authenticator = Authenticator.objects.get(
user=user,
type=Authenticator.Type.TOTP,
)
except Authenticator.DoesNotExist:
return False
# Get the TOTP instance and verify
totp = TOTP(authenticator)
return totp.validate_code(code)
except ImportError:
logger.error("allauth.mfa not available for TOTP verification")
return False
except Exception as e:
logger.error(f"TOTP verification error: {e}")
return False
def _verify_passkey(self, request, user, credential: dict) -> dict:
"""Verify WebAuthn/Passkey credential."""
try:
from allauth.mfa.models import Authenticator
from allauth.mfa.webauthn.internal import auth as webauthn_auth
# Check if user has any WebAuthn authenticators
has_passkey = Authenticator.objects.filter(
user=user,
type=Authenticator.Type.WEBAUTHN,
).exists()
if not has_passkey:
return {"success": False, "error": "No passkey registered for this user"}
try:
# Parse the authentication response
credential_data = webauthn_auth.parse_authentication_response(credential)
# Get or create authentication state
# For login flow, we need to set up the state first
state = webauthn_auth.get_state(request)
if not state:
# If no state, generate one for this user
_, state = webauthn_auth.begin_authentication(request)
webauthn_auth.set_state(request, state)
# Complete authentication
webauthn_auth.complete_authentication(request, credential_data, state)
# Clear the state
webauthn_auth.clear_state(request)
return {"success": True}
except Exception as e:
logger.error(f"WebAuthn authentication failed: {e}")
return {"success": False, "error": str(e)}
except ImportError as e:
logger.error(f"WebAuthn module not available: {e}")
return {"success": False, "error": "Passkey authentication not available"}
except Exception as e:
logger.error(f"Passkey verification error: {e}")
return {"success": False, "error": "Passkey verification failed"}
@extend_schema_view( @extend_schema_view(

View File

@@ -333,6 +333,11 @@ class ParkListCreateAPIView(APIView):
def _apply_park_attribute_filters(self, qs: QuerySet, params: dict) -> QuerySet: def _apply_park_attribute_filters(self, qs: QuerySet, params: dict) -> QuerySet:
"""Apply park attribute filtering to the queryset.""" """Apply park attribute filtering to the queryset."""
# Slug filter - exact match for single park lookup
slug = params.get("slug")
if slug:
qs = qs.filter(slug=slug)
park_type = params.get("park_type") park_type = params.get("park_type")
if park_type: if park_type:
qs = qs.filter(park_type=park_type) qs = qs.filter(park_type=park_type)

View File

@@ -7,7 +7,7 @@ entity completeness, and system health.
from drf_spectacular.utils import extend_schema from drf_spectacular.utils import extend_schema
from rest_framework import status from rest_framework import status
from rest_framework.permissions import IsAdminUser from apps.core.permissions import IsAdminWithSecondFactor
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework.views import APIView from rest_framework.views import APIView
@@ -89,7 +89,7 @@ class DataCompletenessAPIView(APIView):
companies, and ride models. companies, and ride models.
""" """
permission_classes = [IsAdminUser] permission_classes = [IsAdminWithSecondFactor]
@extend_schema( @extend_schema(
tags=["Admin"], tags=["Admin"],

View File

@@ -28,3 +28,65 @@ class IsStaffOrReadOnly(permissions.BasePermission):
if request.method in permissions.SAFE_METHODS: if request.method in permissions.SAFE_METHODS:
return True return True
return request.user and request.user.is_staff return request.user and request.user.is_staff
class IsAdminWithSecondFactor(permissions.BasePermission):
"""
Requires admin status AND at least one configured second factor.
Accepts either:
- TOTP (MFA/Authenticator app)
- WebAuthn (Passkey/Security key)
This permission ensures that admin users have a second factor configured
before they can access sensitive admin endpoints.
"""
message = "Admin access requires MFA or Passkey to be configured."
def has_permission(self, request, view):
user = request.user
# Must be authenticated
if not user or not user.is_authenticated:
return False
# Must be admin (staff, superuser, or ADMIN role)
if not self._is_admin(user):
self.message = "You do not have admin privileges."
return False
# Must have at least one second factor configured
if not self._has_second_factor(user):
self.message = "Admin access requires MFA or Passkey to be configured."
return False
return True
def _is_admin(self, user) -> bool:
"""Check if user has admin privileges."""
if user.is_superuser:
return True
if user.is_staff:
return True
# Check custom role field if it exists
if hasattr(user, "role") and user.role in ("ADMIN", "SUPERUSER"):
return True
return False
def _has_second_factor(self, user) -> bool:
"""Check if user has at least one second factor configured."""
try:
from allauth.mfa.models import Authenticator
# Check for TOTP or WebAuthn authenticators
return Authenticator.objects.filter(
user=user,
type__in=[Authenticator.Type.TOTP, Authenticator.Type.WEBAUTHN]
).exists()
except ImportError:
# allauth.mfa not installed
return False
except Exception:
# Any other error, fail closed (deny access)
return False

View File

@@ -0,0 +1,137 @@
"""
Tests for custom permissions, particularly IsAdminWithSecondFactor.
Tests that admin users must have MFA or Passkey configured before
accessing sensitive admin endpoints.
"""
from unittest.mock import MagicMock, patch
from django.contrib.auth import get_user_model
from django.test import RequestFactory, TestCase
from apps.core.permissions import IsAdminWithSecondFactor
User = get_user_model()
class TestIsAdminWithSecondFactor(TestCase):
"""Tests for IsAdminWithSecondFactor permission class."""
def setUp(self):
"""Set up test fixtures."""
self.factory = RequestFactory()
self.permission = IsAdminWithSecondFactor()
def _make_request(self, user=None):
"""Create a mock request with the given user."""
request = self.factory.get("/api/v1/admin/test/")
request.user = user if user else MagicMock(is_authenticated=False)
return request
def test_anonymous_user_denied(self):
"""Anonymous users should be denied access."""
request = self._make_request()
request.user.is_authenticated = False
self.assertFalse(self.permission.has_permission(request, None))
def test_non_admin_user_denied(self):
"""Non-admin users should be denied access."""
user = MagicMock()
user.is_authenticated = True
user.is_superuser = False
user.is_staff = False
user.role = "USER"
request = self._make_request(user)
self.assertFalse(self.permission.has_permission(request, None))
self.assertIn("admin privileges", self.permission.message)
@patch("apps.core.permissions.IsAdminWithSecondFactor._has_second_factor")
def test_admin_without_mfa_denied(self, mock_has_second_factor):
"""Admin without MFA or Passkey should be denied access."""
mock_has_second_factor.return_value = False
user = MagicMock()
user.is_authenticated = True
user.is_superuser = True
user.is_staff = True
user.role = "ADMIN"
request = self._make_request(user)
self.assertFalse(self.permission.has_permission(request, None))
self.assertIn("MFA or Passkey", self.permission.message)
@patch("apps.core.permissions.IsAdminWithSecondFactor._has_second_factor")
def test_superuser_with_mfa_allowed(self, mock_has_second_factor):
"""Superuser with MFA configured should be allowed access."""
mock_has_second_factor.return_value = True
user = MagicMock()
user.is_authenticated = True
user.is_superuser = True
user.is_staff = True
request = self._make_request(user)
self.assertTrue(self.permission.has_permission(request, None))
@patch("apps.core.permissions.IsAdminWithSecondFactor._has_second_factor")
def test_staff_with_passkey_allowed(self, mock_has_second_factor):
"""Staff user with Passkey configured should be allowed access."""
mock_has_second_factor.return_value = True
user = MagicMock()
user.is_authenticated = True
user.is_superuser = False
user.is_staff = True
request = self._make_request(user)
self.assertTrue(self.permission.has_permission(request, None))
@patch("apps.core.permissions.IsAdminWithSecondFactor._has_second_factor")
def test_admin_role_with_mfa_allowed(self, mock_has_second_factor):
"""User with ADMIN role and MFA should be allowed access."""
mock_has_second_factor.return_value = True
user = MagicMock()
user.is_authenticated = True
user.is_superuser = False
user.is_staff = False
user.role = "ADMIN"
request = self._make_request(user)
self.assertTrue(self.permission.has_permission(request, None))
def test_has_second_factor_with_totp(self):
"""Test _has_second_factor detects TOTP authenticator."""
user = MagicMock()
with patch("apps.core.permissions.Authenticator") as MockAuth:
# Mock the queryset to return True for TOTP
mock_qs = MagicMock()
mock_qs.filter.return_value.exists.return_value = True
MockAuth.objects.filter.return_value = mock_qs
MockAuth.Type.TOTP = "totp"
MockAuth.Type.WEBAUTHN = "webauthn"
# Need to patch the import inside the method
with patch.dict("sys.modules", {"allauth.mfa.models": MagicMock(Authenticator=MockAuth)}):
result = self.permission._has_second_factor(user)
# This tests the exception path since import is mocked at module level
# The actual integration test would require a full database setup
def test_has_second_factor_import_error(self):
"""Test _has_second_factor handles ImportError gracefully."""
user = MagicMock()
with patch.dict("sys.modules", {"allauth.mfa.models": None}):
with patch("builtins.__import__", side_effect=ImportError):
# Should return False, not raise exception
result = self.permission._has_second_factor(user)
self.assertFalse(result)

View File

@@ -173,6 +173,10 @@ class IsModeratorOrAdmin(GuardMixin, permissions.BasePermission):
if not request.user or not request.user.is_authenticated: if not request.user or not request.user.is_authenticated:
return False return False
# Django superusers always have access
if getattr(request.user, "is_superuser", False):
return True
user_role = getattr(request.user, "role", "USER") user_role = getattr(request.user, "role", "USER")
return user_role in ["MODERATOR", "ADMIN", "SUPERUSER"] return user_role in ["MODERATOR", "ADMIN", "SUPERUSER"]
@@ -193,6 +197,10 @@ class IsAdminOrSuperuser(GuardMixin, permissions.BasePermission):
if not request.user or not request.user.is_authenticated: if not request.user or not request.user.is_authenticated:
return False return False
# Django superusers always have access
if getattr(request.user, "is_superuser", False):
return True
user_role = getattr(request.user, "role", "USER") user_role = getattr(request.user, "role", "USER")
return user_role in ["ADMIN", "SUPERUSER"] return user_role in ["ADMIN", "SUPERUSER"]
@@ -220,6 +228,10 @@ class CanViewModerationData(GuardMixin, permissions.BasePermission):
if not request.user or not request.user.is_authenticated: if not request.user or not request.user.is_authenticated:
return False return False
# Django superusers can view all data
if getattr(request.user, "is_superuser", False):
return True
user_role = getattr(request.user, "role", "USER") user_role = getattr(request.user, "role", "USER")
# Moderators and above can view all data # Moderators and above can view all data
@@ -249,6 +261,10 @@ class CanModerateContent(GuardMixin, permissions.BasePermission):
if not request.user or not request.user.is_authenticated: if not request.user or not request.user.is_authenticated:
return False return False
# Django superusers always have access
if getattr(request.user, "is_superuser", False):
return True
user_role = getattr(request.user, "role", "USER") user_role = getattr(request.user, "role", "USER")
return user_role in ["MODERATOR", "ADMIN", "SUPERUSER"] return user_role in ["MODERATOR", "ADMIN", "SUPERUSER"]
@@ -257,6 +273,10 @@ class CanModerateContent(GuardMixin, permissions.BasePermission):
if not self.has_permission(request, view): if not self.has_permission(request, view):
return False return False
# Django superusers can do everything
if getattr(request.user, "is_superuser", False):
return True
user_role = getattr(request.user, "role", "USER") user_role = getattr(request.user, "role", "USER")
# Superusers can do everything # Superusers can do everything
@@ -297,6 +317,10 @@ class CanAssignModerationTasks(GuardMixin, permissions.BasePermission):
if not request.user or not request.user.is_authenticated: if not request.user or not request.user.is_authenticated:
return False return False
# Django superusers always have access
if getattr(request.user, "is_superuser", False):
return True
user_role = getattr(request.user, "role", "USER") user_role = getattr(request.user, "role", "USER")
return user_role in ["MODERATOR", "ADMIN", "SUPERUSER"] return user_role in ["MODERATOR", "ADMIN", "SUPERUSER"]
@@ -341,6 +365,10 @@ class CanPerformBulkOperations(GuardMixin, permissions.BasePermission):
if not request.user or not request.user.is_authenticated: if not request.user or not request.user.is_authenticated:
return False return False
# Django superusers always have access
if getattr(request.user, "is_superuser", False):
return True
user_role = getattr(request.user, "role", "USER") user_role = getattr(request.user, "role", "USER")
return user_role in ["ADMIN", "SUPERUSER"] return user_role in ["ADMIN", "SUPERUSER"]
@@ -349,6 +377,10 @@ class CanPerformBulkOperations(GuardMixin, permissions.BasePermission):
if not self.has_permission(request, view): if not self.has_permission(request, view):
return False return False
# Django superusers can perform all bulk operations
if getattr(request.user, "is_superuser", False):
return True
user_role = getattr(request.user, "role", "USER") user_role = getattr(request.user, "role", "USER")
# Superusers can perform all bulk operations # Superusers can perform all bulk operations
@@ -386,6 +418,10 @@ class IsOwnerOrModerator(GuardMixin, permissions.BasePermission):
if not request.user or not request.user.is_authenticated: if not request.user or not request.user.is_authenticated:
return False return False
# Django superusers can access any object
if getattr(request.user, "is_superuser", False):
return True
user_role = getattr(request.user, "role", "USER") user_role = getattr(request.user, "role", "USER")
# Moderators and above can access any object # Moderators and above can access any object
@@ -419,6 +455,10 @@ class CanManageUserRestrictions(GuardMixin, permissions.BasePermission):
if not request.user or not request.user.is_authenticated: if not request.user or not request.user.is_authenticated:
return False return False
# Django superusers always have access
if getattr(request.user, "is_superuser", False):
return True
user_role = getattr(request.user, "role", "USER") user_role = getattr(request.user, "role", "USER")
return user_role in ["MODERATOR", "ADMIN", "SUPERUSER"] return user_role in ["MODERATOR", "ADMIN", "SUPERUSER"]
@@ -427,6 +467,10 @@ class CanManageUserRestrictions(GuardMixin, permissions.BasePermission):
if not self.has_permission(request, view): if not self.has_permission(request, view):
return False return False
# Django superusers can manage any restriction
if getattr(request.user, "is_superuser", False):
return True
user_role = getattr(request.user, "role", "USER") user_role = getattr(request.user, "role", "USER")
# Superusers can manage any restriction # Superusers can manage any restriction

View File

@@ -82,6 +82,7 @@ THIRD_PARTY_APPS = [
"allauth", "allauth",
"allauth.account", "allauth.account",
"allauth.mfa", # MFA/TOTP support "allauth.mfa", # MFA/TOTP support
"allauth.mfa.webauthn", # WebAuthn/Passkey support
"allauth.socialaccount", "allauth.socialaccount",
"allauth.socialaccount.providers.google", "allauth.socialaccount.providers.google",
"allauth.socialaccount.providers.discord", "allauth.socialaccount.providers.discord",

View File

@@ -76,8 +76,8 @@ SOCIALACCOUNT_STORE_TOKENS = True
# ============================================================================= # =============================================================================
# https://docs.allauth.org/en/latest/mfa/index.html # https://docs.allauth.org/en/latest/mfa/index.html
# Supported authenticator types # Supported authenticator types - TOTP and WebAuthn (Passkeys)
MFA_SUPPORTED_TYPES = ["totp"] MFA_SUPPORTED_TYPES = ["totp", "webauthn"]
# TOTP settings # TOTP settings
MFA_TOTP_ISSUER = config("MFA_TOTP_ISSUER", default="ThrillWiki") MFA_TOTP_ISSUER = config("MFA_TOTP_ISSUER", default="ThrillWiki")
@@ -88,6 +88,17 @@ MFA_TOTP_DIGITS = 6
# Interval in seconds for TOTP code generation (default 30) # Interval in seconds for TOTP code generation (default 30)
MFA_TOTP_PERIOD = 30 MFA_TOTP_PERIOD = 30
# WebAuthn/Passkey settings
MFA_PASSKEY_LOGIN_ENABLED = config("MFA_PASSKEY_LOGIN_ENABLED", default=True, cast=bool)
# Read DEBUG directly (same source as base.py) to avoid circular import
_DEBUG_MFA = config("DEBUG", default=True, cast=bool)
# Allow insecure origin (http://localhost) for WebAuthn in development
MFA_WEBAUTHN_ALLOW_INSECURE_ORIGIN = config(
"MFA_WEBAUTHN_ALLOW_INSECURE_ORIGIN", default=_DEBUG_MFA, cast=bool
)
# ============================================================================= # =============================================================================
# Login By Code (Magic Link) Configuration # Login By Code (Magic Link) Configuration
# ============================================================================= # =============================================================================
@@ -202,7 +213,10 @@ FRONTEND_DOMAIN = config("FRONTEND_DOMAIN", default="https://thrillwiki.com")
TURNSTILE_SITEKEY = config("TURNSTILE_SITEKEY", default="") TURNSTILE_SITEKEY = config("TURNSTILE_SITEKEY", default="")
TURNSTILE_SECRET = config("TURNSTILE_SECRET", default="") TURNSTILE_SECRET = config("TURNSTILE_SECRET", default="")
# Skip Turnstile validation in development if keys not set # Read DEBUG directly (same source as base.py) to avoid circular import
_DEBUG = config("DEBUG", default=True, cast=bool)
# Skip Turnstile validation in debug mode or if no secret configured
TURNSTILE_SKIP_VALIDATION = config( TURNSTILE_SKIP_VALIDATION = config(
"TURNSTILE_SKIP_VALIDATION", default=not TURNSTILE_SECRET, cast=bool # Skip if no secret "TURNSTILE_SKIP_VALIDATION", default=(_DEBUG or not TURNSTILE_SECRET), cast=bool
) )