mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2026-02-05 02:35:18 -05:00
feat: add passkey authentication and enhance user preferences - Add passkey login security event type with fingerprint icon - Include request and site context in email confirmation for backend - Add user_id exact match filter to prevent incorrect user lookups - Enable PATCH method for updating user preferences via API - Add moderation_preferences support to user settings - Optimize ticket queries with select_related and prefetch_related This commit introduces passkey authentication tracking, improves user profile filtering accuracy, and extends the preferences API to support updates. Query optimizations reduce database hits for ticket listings.
606 lines
21 KiB
Python
606 lines
21 KiB
Python
"""
|
|
Passkey (WebAuthn) API Views
|
|
|
|
Provides REST API endpoints for WebAuthn/Passkey operations using django-allauth's
|
|
mfa.webauthn module. Supports passkey registration, authentication, and management.
|
|
"""
|
|
|
|
import logging
|
|
|
|
from drf_spectacular.utils import extend_schema
|
|
from rest_framework import status
|
|
from rest_framework.decorators import api_view, permission_classes
|
|
from rest_framework.permissions import AllowAny, IsAuthenticated
|
|
from rest_framework.response import Response
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="get_passkey_status",
|
|
summary="Get passkey status for current user",
|
|
description="Returns whether passkeys are enabled and lists registered passkeys.",
|
|
responses={
|
|
200: {
|
|
"description": "Passkey status",
|
|
"example": {
|
|
"passkey_enabled": True,
|
|
"passkeys": [
|
|
{"id": "abc123", "name": "MacBook Pro", "created_at": "2026-01-06T12:00:00Z"}
|
|
],
|
|
},
|
|
},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["GET"])
|
|
@permission_classes([IsAuthenticated])
|
|
def get_passkey_status(request):
|
|
"""Get passkey status for current user."""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
|
|
user = request.user
|
|
passkeys = Authenticator.objects.filter(
|
|
user=user, type=Authenticator.Type.WEBAUTHN
|
|
)
|
|
|
|
passkey_list = []
|
|
for pk in passkeys:
|
|
passkey_data = pk.data or {}
|
|
passkey_list.append({
|
|
"id": str(pk.id),
|
|
"name": passkey_data.get("name", "Passkey"),
|
|
"created_at": pk.created_at.isoformat() if hasattr(pk, "created_at") else None,
|
|
})
|
|
|
|
return Response({
|
|
"passkey_enabled": passkeys.exists(),
|
|
"passkey_count": passkeys.count(),
|
|
"passkeys": passkey_list,
|
|
})
|
|
except ImportError:
|
|
return Response({
|
|
"passkey_enabled": False,
|
|
"passkey_count": 0,
|
|
"passkeys": [],
|
|
"error": "WebAuthn module not available",
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error getting passkey status: {e}")
|
|
return Response(
|
|
{"detail": "Failed to get passkey status"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="get_registration_options",
|
|
summary="Get WebAuthn registration options",
|
|
description="Returns options for registering a new passkey. Start the registration flow.",
|
|
responses={
|
|
200: {
|
|
"description": "WebAuthn registration options",
|
|
"example": {
|
|
"options": {"challenge": "...", "rp": {"name": "ThrillWiki"}},
|
|
},
|
|
},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["GET"])
|
|
@permission_classes([IsAuthenticated])
|
|
def get_registration_options(request):
|
|
"""Get WebAuthn registration options for passkey setup."""
|
|
try:
|
|
from django.utils import timezone
|
|
from allauth.mfa.webauthn.internal import auth as webauthn_auth
|
|
|
|
# Use the correct allauth API: begin_registration
|
|
# The function takes (user, passwordless) - passwordless=False for standard passkeys
|
|
creation_options = webauthn_auth.begin_registration(request.user, passwordless=False)
|
|
|
|
# State is stored internally by begin_registration via set_state()
|
|
|
|
# Store registration timeout in session (5 minutes)
|
|
request.session["pending_passkey_expires"] = timezone.now().timestamp() + 300 # 5 minutes
|
|
|
|
# Debug log the structure
|
|
logger.debug(f"WebAuthn registration options type: {type(creation_options)}")
|
|
logger.debug(f"WebAuthn registration options keys: {creation_options.keys() if isinstance(creation_options, dict) else 'not a dict'}")
|
|
logger.info(f"WebAuthn registration options: {creation_options}")
|
|
|
|
return Response({
|
|
"options": creation_options,
|
|
"expires_in_seconds": 300,
|
|
})
|
|
except ImportError as e:
|
|
logger.error(f"WebAuthn module import error: {e}")
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error getting registration options: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to get registration options: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="register_passkey",
|
|
summary="Complete passkey registration",
|
|
description="Verifies the WebAuthn response and registers the new passkey.",
|
|
request={
|
|
"application/json": {
|
|
"type": "object",
|
|
"properties": {
|
|
"credential": {"type": "object", "description": "WebAuthn credential response"},
|
|
"name": {"type": "string", "description": "Name for this passkey"},
|
|
},
|
|
"required": ["credential"],
|
|
}
|
|
},
|
|
responses={
|
|
200: {"description": "Passkey registered successfully"},
|
|
400: {"description": "Invalid credential or registration failed"},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["POST"])
|
|
@permission_classes([IsAuthenticated])
|
|
def register_passkey(request):
|
|
"""Complete passkey registration with WebAuthn response."""
|
|
try:
|
|
from django.utils import timezone
|
|
from allauth.mfa.webauthn.internal import auth as webauthn_auth
|
|
|
|
from apps.accounts.services.security_service import (
|
|
log_security_event,
|
|
send_security_notification,
|
|
)
|
|
|
|
credential = request.data.get("credential")
|
|
name = request.data.get("name", "Passkey")
|
|
|
|
if not credential:
|
|
return Response(
|
|
{"detail": "Credential is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Check if registration has expired (5 minute timeout)
|
|
expires_at = request.session.get("pending_passkey_expires")
|
|
if expires_at and timezone.now().timestamp() > expires_at:
|
|
# Clear expired session data
|
|
if "pending_passkey_expires" in request.session:
|
|
del request.session["pending_passkey_expires"]
|
|
return Response(
|
|
{"detail": "Passkey registration session expired. Please start registration again."},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Get stored state from session (no request needed, uses context)
|
|
state = webauthn_auth.get_state()
|
|
if not state:
|
|
return Response(
|
|
{"detail": "No pending registration. Please start registration again."},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Use the correct allauth API: complete_registration
|
|
try:
|
|
from allauth.mfa.webauthn.internal.auth import WebAuthn
|
|
|
|
# Parse the credential response to validate it
|
|
credential_data = webauthn_auth.parse_registration_response(credential)
|
|
|
|
# Complete registration to validate and clear state
|
|
webauthn_auth.complete_registration(credential_data)
|
|
|
|
# Use allauth's WebAuthn.add() to create the Authenticator properly
|
|
# It stores the raw credential dict and name in the data field
|
|
webauthn_wrapper = WebAuthn.add(
|
|
request.user,
|
|
name,
|
|
credential, # Pass raw credential dict, not parsed data
|
|
)
|
|
authenticator = webauthn_wrapper.instance
|
|
|
|
# Log security event
|
|
log_security_event(
|
|
"passkey_registered",
|
|
request,
|
|
user=request.user,
|
|
metadata={"passkey_name": name, "passkey_id": str(authenticator.id) if authenticator else None},
|
|
)
|
|
|
|
# Send security notification email
|
|
send_security_notification(request.user, "passkey_registered", {"passkey_name": name})
|
|
|
|
return Response({
|
|
"detail": "Passkey registered successfully",
|
|
"name": name,
|
|
"id": str(authenticator.id) if authenticator else None,
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"WebAuthn registration failed: {e}")
|
|
return Response(
|
|
{"detail": f"Registration failed: {str(e)}"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except ImportError as e:
|
|
logger.error(f"WebAuthn module import error: {e}")
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error registering passkey: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to register passkey: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="get_authentication_options",
|
|
summary="Get WebAuthn authentication options",
|
|
description="Returns options for authenticating with a passkey.",
|
|
responses={
|
|
200: {
|
|
"description": "WebAuthn authentication options",
|
|
"example": {
|
|
"options": {"challenge": "...", "allowCredentials": []},
|
|
},
|
|
},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["GET"])
|
|
@permission_classes([IsAuthenticated])
|
|
def get_authentication_options(request):
|
|
"""Get WebAuthn authentication options for passkey verification."""
|
|
try:
|
|
from allauth.mfa.webauthn.internal import auth as webauthn_auth
|
|
|
|
# Use the correct allauth API: begin_authentication
|
|
# Takes optional user, returns just options (state is stored internally)
|
|
request_options = webauthn_auth.begin_authentication(request.user)
|
|
|
|
return Response({
|
|
"options": request_options,
|
|
})
|
|
except ImportError as e:
|
|
logger.error(f"WebAuthn module import error: {e}")
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error getting authentication options: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to get authentication options: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="authenticate_passkey",
|
|
summary="Authenticate with passkey",
|
|
description="Verifies the WebAuthn response for authentication.",
|
|
request={
|
|
"application/json": {
|
|
"type": "object",
|
|
"properties": {
|
|
"credential": {"type": "object", "description": "WebAuthn credential response"},
|
|
},
|
|
"required": ["credential"],
|
|
}
|
|
},
|
|
responses={
|
|
200: {"description": "Authentication successful"},
|
|
400: {"description": "Invalid credential or authentication failed"},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["POST"])
|
|
@permission_classes([IsAuthenticated])
|
|
def authenticate_passkey(request):
|
|
"""Verify passkey authentication."""
|
|
try:
|
|
from allauth.mfa.webauthn.internal import auth as webauthn_auth
|
|
|
|
credential = request.data.get("credential")
|
|
|
|
if not credential:
|
|
return Response(
|
|
{"detail": "Credential is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Get stored state from session (no request needed, uses context)
|
|
state = webauthn_auth.get_state()
|
|
if not state:
|
|
return Response(
|
|
{"detail": "No pending authentication. Please start authentication again."},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Use the correct allauth API: complete_authentication
|
|
try:
|
|
# Complete authentication - takes user and credential response
|
|
# State is handled internally
|
|
webauthn_auth.complete_authentication(request.user, credential)
|
|
|
|
return Response({"success": True})
|
|
except Exception as e:
|
|
logger.error(f"WebAuthn authentication failed: {e}")
|
|
return Response(
|
|
{"detail": f"Authentication failed: {str(e)}"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except ImportError as e:
|
|
logger.error(f"WebAuthn module import error: {e}")
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error authenticating passkey: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to authenticate: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="delete_passkey",
|
|
summary="Delete a passkey",
|
|
description="Removes a registered passkey from the user's account.",
|
|
request={
|
|
"application/json": {
|
|
"type": "object",
|
|
"properties": {
|
|
"password": {"type": "string", "description": "Current password for confirmation"},
|
|
},
|
|
"required": ["password"],
|
|
}
|
|
},
|
|
responses={
|
|
200: {"description": "Passkey deleted successfully"},
|
|
400: {"description": "Invalid password or passkey not found"},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["DELETE"])
|
|
@permission_classes([IsAuthenticated])
|
|
def delete_passkey(request, passkey_id):
|
|
"""Delete a passkey."""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
|
|
from apps.accounts.services.security_service import (
|
|
check_auth_method_availability,
|
|
log_security_event,
|
|
send_security_notification,
|
|
)
|
|
|
|
user = request.user
|
|
password = request.data.get("password", "")
|
|
|
|
# Verify password
|
|
if not user.check_password(password):
|
|
return Response(
|
|
{"detail": "Invalid password"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Check if user has other auth methods before removing passkey
|
|
auth_methods = check_auth_method_availability(user)
|
|
|
|
# If this is the last passkey and user has no other auth method, block removal
|
|
if auth_methods["passkey_count"] == 1:
|
|
if not auth_methods["has_password"] and not auth_methods["has_social"] and not auth_methods["has_totp"]:
|
|
return Response(
|
|
{"detail": "Cannot remove last passkey: you must have at least one authentication method. Please set a password or connect a social account first."},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Find and delete the passkey
|
|
try:
|
|
authenticator = Authenticator.objects.get(
|
|
id=passkey_id,
|
|
user=user,
|
|
type=Authenticator.Type.WEBAUTHN,
|
|
)
|
|
passkey_name = authenticator.data.get("name", "Passkey") if authenticator.data else "Passkey"
|
|
authenticator.delete()
|
|
|
|
# Log security event
|
|
log_security_event(
|
|
"passkey_removed",
|
|
request,
|
|
user=user,
|
|
metadata={"passkey_name": passkey_name, "passkey_id": str(passkey_id)},
|
|
)
|
|
|
|
# Send security notification email
|
|
send_security_notification(user, "passkey_removed", {"passkey_name": passkey_name})
|
|
|
|
except Authenticator.DoesNotExist:
|
|
return Response(
|
|
{"detail": "Passkey not found"},
|
|
status=status.HTTP_404_NOT_FOUND,
|
|
)
|
|
|
|
return Response({"detail": "Passkey deleted successfully"})
|
|
except ImportError:
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error deleting passkey: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to delete passkey: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="rename_passkey",
|
|
summary="Rename a passkey",
|
|
description="Updates the name of a registered passkey.",
|
|
request={
|
|
"application/json": {
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {"type": "string", "description": "New name for the passkey"},
|
|
},
|
|
"required": ["name"],
|
|
}
|
|
},
|
|
responses={
|
|
200: {"description": "Passkey renamed successfully"},
|
|
404: {"description": "Passkey not found"},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["PATCH"])
|
|
@permission_classes([IsAuthenticated])
|
|
def rename_passkey(request, passkey_id):
|
|
"""Rename a passkey."""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
|
|
user = request.user
|
|
new_name = request.data.get("name", "").strip()
|
|
|
|
if not new_name:
|
|
return Response(
|
|
{"detail": "Name is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
try:
|
|
authenticator = Authenticator.objects.get(
|
|
id=passkey_id, user=user, type=Authenticator.Type.WEBAUTHN,
|
|
)
|
|
data = authenticator.data or {}
|
|
data["name"] = new_name
|
|
authenticator.data = data
|
|
authenticator.save()
|
|
except Authenticator.DoesNotExist:
|
|
return Response(
|
|
{"detail": "Passkey not found"},
|
|
status=status.HTTP_404_NOT_FOUND,
|
|
)
|
|
|
|
return Response({"detail": "Passkey renamed successfully", "name": new_name})
|
|
except ImportError:
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error renaming passkey: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to rename passkey: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="get_login_passkey_options",
|
|
summary="Get WebAuthn options for MFA login",
|
|
description="Returns passkey auth options using MFA token (unauthenticated).",
|
|
request={
|
|
"application/json": {
|
|
"type": "object",
|
|
"properties": {
|
|
"mfa_token": {"type": "string", "description": "MFA token from login"},
|
|
},
|
|
"required": ["mfa_token"],
|
|
}
|
|
},
|
|
responses={
|
|
200: {"description": "WebAuthn authentication options"},
|
|
400: {"description": "Invalid or expired MFA token"},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["POST"])
|
|
@permission_classes([AllowAny])
|
|
def get_login_passkey_options(request):
|
|
"""Get WebAuthn authentication options for MFA login flow (unauthenticated)."""
|
|
from django.core.cache import cache
|
|
from django.contrib.auth import get_user_model
|
|
|
|
User = get_user_model()
|
|
mfa_token = request.data.get("mfa_token")
|
|
|
|
if not mfa_token:
|
|
return Response(
|
|
{"detail": "MFA token is required"}, status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
cache_key = f"mfa_login:{mfa_token}"
|
|
cached_data = cache.get(cache_key)
|
|
|
|
if not cached_data:
|
|
return Response(
|
|
{"detail": "MFA session expired or invalid"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
user_id = cached_data.get("user_id")
|
|
|
|
try:
|
|
user = User.objects.get(pk=user_id)
|
|
except User.DoesNotExist:
|
|
return Response({"detail": "User not found"}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
from allauth.mfa.webauthn.internal import auth as webauthn_auth
|
|
|
|
passkeys = Authenticator.objects.filter(
|
|
user=user, type=Authenticator.Type.WEBAUTHN
|
|
)
|
|
|
|
if not passkeys.exists():
|
|
return Response(
|
|
{"detail": "No passkeys registered"}, status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
original_user = getattr(request, "user", None)
|
|
request.user = user
|
|
|
|
try:
|
|
# begin_authentication takes just user, returns options (state stored internally)
|
|
request_options = webauthn_auth.begin_authentication(user)
|
|
# Note: State is managed by allauth's session context, but for MFA login flow
|
|
# we need to track user separately since they're not authenticated yet
|
|
passkey_state_key = f"mfa_passkey_state:{mfa_token}"
|
|
# Store a reference that this user has a pending passkey auth
|
|
cache.set(passkey_state_key, {"user_id": user_id}, timeout=300)
|
|
return Response({"options": request_options})
|
|
finally:
|
|
if original_user is not None:
|
|
request.user = original_user
|
|
|
|
except ImportError as e:
|
|
logger.error(f"WebAuthn module import error: {e}")
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error getting login passkey options: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to get passkey options: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|