""" Passkey (WebAuthn) API Views Provides REST API endpoints for WebAuthn/Passkey operations using django-allauth's mfa.webauthn module. Supports passkey registration, authentication, and management. """ import logging from drf_spectacular.utils import extend_schema from rest_framework import status from rest_framework.decorators import api_view, permission_classes from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response logger = logging.getLogger(__name__) @extend_schema( operation_id="get_passkey_status", summary="Get passkey status for current user", description="Returns whether passkeys are enabled and lists registered passkeys.", responses={ 200: { "description": "Passkey status", "example": { "passkey_enabled": True, "passkeys": [ {"id": "abc123", "name": "MacBook Pro", "created_at": "2026-01-06T12:00:00Z"} ], }, }, }, tags=["Passkey"], ) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def get_passkey_status(request): """Get passkey status for current user.""" try: from allauth.mfa.models import Authenticator user = request.user passkeys = Authenticator.objects.filter( user=user, type=Authenticator.Type.WEBAUTHN ) passkey_list = [] for pk in passkeys: passkey_data = pk.data or {} passkey_list.append({ "id": str(pk.id), "name": passkey_data.get("name", "Passkey"), "created_at": pk.created_at.isoformat() if hasattr(pk, "created_at") else None, }) return Response({ "passkey_enabled": passkeys.exists(), "passkey_count": passkeys.count(), "passkeys": passkey_list, }) except ImportError: return Response({ "passkey_enabled": False, "passkey_count": 0, "passkeys": [], "error": "WebAuthn module not available", }) except Exception as e: logger.error(f"Error getting passkey status: {e}") return Response( {"detail": "Failed to get passkey status"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) @extend_schema( operation_id="get_registration_options", summary="Get WebAuthn registration options", description="Returns options for registering a new passkey. Start the registration flow.", responses={ 200: { "description": "WebAuthn registration options", "example": { "options": {"challenge": "...", "rp": {"name": "ThrillWiki"}}, }, }, }, tags=["Passkey"], ) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def get_registration_options(request): """Get WebAuthn registration options for passkey setup.""" try: from allauth.mfa.webauthn.internal import auth as webauthn_auth # Use the correct allauth API: begin_registration # The function takes (user, passwordless) - passwordless=False for standard passkeys creation_options = webauthn_auth.begin_registration(request.user, passwordless=False) # State is stored internally by begin_registration via set_state() return Response({ "options": creation_options, }) except ImportError as e: logger.error(f"WebAuthn module import error: {e}") return Response( {"detail": "WebAuthn module not available"}, status=status.HTTP_400_BAD_REQUEST, ) except Exception as e: logger.error(f"Error getting registration options: {e}") return Response( {"detail": f"Failed to get registration options: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) @extend_schema( operation_id="register_passkey", summary="Complete passkey registration", description="Verifies the WebAuthn response and registers the new passkey.", request={ "application/json": { "type": "object", "properties": { "credential": {"type": "object", "description": "WebAuthn credential response"}, "name": {"type": "string", "description": "Name for this passkey"}, }, "required": ["credential"], } }, responses={ 200: {"description": "Passkey registered successfully"}, 400: {"description": "Invalid credential or registration failed"}, }, tags=["Passkey"], ) @api_view(["POST"]) @permission_classes([IsAuthenticated]) def register_passkey(request): """Complete passkey registration with WebAuthn response.""" try: from allauth.mfa.webauthn.internal import auth as webauthn_auth credential = request.data.get("credential") name = request.data.get("name", "Passkey") if not credential: return Response( {"detail": "Credential is required"}, status=status.HTTP_400_BAD_REQUEST, ) # Get stored state from session (no request needed, uses context) state = webauthn_auth.get_state() if not state: return Response( {"detail": "No pending registration. Please start registration again."}, status=status.HTTP_400_BAD_REQUEST, ) # Use the correct allauth API: complete_registration try: from allauth.mfa.models import Authenticator # Parse the credential response credential_data = webauthn_auth.parse_registration_response(credential) # Complete registration - returns AuthenticatorData (binding) authenticator_data = webauthn_auth.complete_registration(credential_data) # Create the Authenticator record ourselves authenticator = Authenticator.objects.create( user=request.user, type=Authenticator.Type.WEBAUTHN, data={ "name": name, "credential": authenticator_data.credential_data.aaguid.hex if authenticator_data.credential_data else None, }, ) # State is cleared internally by complete_registration return Response({ "detail": "Passkey registered successfully", "name": name, "id": str(authenticator.id) if authenticator else None, }) except Exception as e: logger.error(f"WebAuthn registration failed: {e}") return Response( {"detail": f"Registration failed: {str(e)}"}, status=status.HTTP_400_BAD_REQUEST, ) except ImportError as e: logger.error(f"WebAuthn module import error: {e}") return Response( {"detail": "WebAuthn module not available"}, status=status.HTTP_400_BAD_REQUEST, ) except Exception as e: logger.error(f"Error registering passkey: {e}") return Response( {"detail": f"Failed to register passkey: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) @extend_schema( operation_id="get_authentication_options", summary="Get WebAuthn authentication options", description="Returns options for authenticating with a passkey.", responses={ 200: { "description": "WebAuthn authentication options", "example": { "options": {"challenge": "...", "allowCredentials": []}, }, }, }, tags=["Passkey"], ) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def get_authentication_options(request): """Get WebAuthn authentication options for passkey verification.""" try: from allauth.mfa.webauthn.internal import auth as webauthn_auth # Use the correct allauth API: begin_authentication # Takes optional user, returns just options (state is stored internally) request_options = webauthn_auth.begin_authentication(request.user) return Response({ "options": request_options, }) except ImportError as e: logger.error(f"WebAuthn module import error: {e}") return Response( {"detail": "WebAuthn module not available"}, status=status.HTTP_400_BAD_REQUEST, ) except Exception as e: logger.error(f"Error getting authentication options: {e}") return Response( {"detail": f"Failed to get authentication options: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) @extend_schema( operation_id="authenticate_passkey", summary="Authenticate with passkey", description="Verifies the WebAuthn response for authentication.", request={ "application/json": { "type": "object", "properties": { "credential": {"type": "object", "description": "WebAuthn credential response"}, }, "required": ["credential"], } }, responses={ 200: {"description": "Authentication successful"}, 400: {"description": "Invalid credential or authentication failed"}, }, tags=["Passkey"], ) @api_view(["POST"]) @permission_classes([IsAuthenticated]) def authenticate_passkey(request): """Verify passkey authentication.""" try: from allauth.mfa.webauthn.internal import auth as webauthn_auth credential = request.data.get("credential") if not credential: return Response( {"detail": "Credential is required"}, status=status.HTTP_400_BAD_REQUEST, ) # Get stored state from session (no request needed, uses context) state = webauthn_auth.get_state() if not state: return Response( {"detail": "No pending authentication. Please start authentication again."}, status=status.HTTP_400_BAD_REQUEST, ) # Use the correct allauth API: complete_authentication try: # Complete authentication - takes user and credential response # State is handled internally webauthn_auth.complete_authentication(request.user, credential) return Response({"success": True}) except Exception as e: logger.error(f"WebAuthn authentication failed: {e}") return Response( {"detail": f"Authentication failed: {str(e)}"}, status=status.HTTP_400_BAD_REQUEST, ) except ImportError as e: logger.error(f"WebAuthn module import error: {e}") return Response( {"detail": "WebAuthn module not available"}, status=status.HTTP_400_BAD_REQUEST, ) except Exception as e: logger.error(f"Error authenticating passkey: {e}") return Response( {"detail": f"Failed to authenticate: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) @extend_schema( operation_id="delete_passkey", summary="Delete a passkey", description="Removes a registered passkey from the user's account.", request={ "application/json": { "type": "object", "properties": { "password": {"type": "string", "description": "Current password for confirmation"}, }, "required": ["password"], } }, responses={ 200: {"description": "Passkey deleted successfully"}, 400: {"description": "Invalid password or passkey not found"}, }, tags=["Passkey"], ) @api_view(["DELETE"]) @permission_classes([IsAuthenticated]) def delete_passkey(request, passkey_id): """Delete a passkey.""" try: from allauth.mfa.models import Authenticator user = request.user password = request.data.get("password", "") # Verify password if not user.check_password(password): return Response( {"detail": "Invalid password"}, status=status.HTTP_400_BAD_REQUEST, ) # Find and delete the passkey try: authenticator = Authenticator.objects.get( id=passkey_id, user=user, type=Authenticator.Type.WEBAUTHN, ) authenticator.delete() except Authenticator.DoesNotExist: return Response( {"detail": "Passkey not found"}, status=status.HTTP_404_NOT_FOUND, ) return Response({"detail": "Passkey deleted successfully"}) except ImportError: return Response( {"detail": "WebAuthn module not available"}, status=status.HTTP_400_BAD_REQUEST, ) except Exception as e: logger.error(f"Error deleting passkey: {e}") return Response( {"detail": f"Failed to delete passkey: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) @extend_schema( operation_id="rename_passkey", summary="Rename a passkey", description="Updates the name of a registered passkey.", request={ "application/json": { "type": "object", "properties": { "name": {"type": "string", "description": "New name for the passkey"}, }, "required": ["name"], } }, responses={ 200: {"description": "Passkey renamed successfully"}, 404: {"description": "Passkey not found"}, }, tags=["Passkey"], ) @api_view(["PATCH"]) @permission_classes([IsAuthenticated]) def rename_passkey(request, passkey_id): """Rename a passkey.""" try: from allauth.mfa.models import Authenticator user = request.user new_name = request.data.get("name", "").strip() if not new_name: return Response( {"detail": "Name is required"}, status=status.HTTP_400_BAD_REQUEST, ) try: authenticator = Authenticator.objects.get( id=passkey_id, user=user, type=Authenticator.Type.WEBAUTHN, ) data = authenticator.data or {} data["name"] = new_name authenticator.data = data authenticator.save() except Authenticator.DoesNotExist: return Response( {"detail": "Passkey not found"}, status=status.HTTP_404_NOT_FOUND, ) return Response({"detail": "Passkey renamed successfully", "name": new_name}) except ImportError: return Response( {"detail": "WebAuthn module not available"}, status=status.HTTP_400_BAD_REQUEST, ) except Exception as e: logger.error(f"Error renaming passkey: {e}") return Response( {"detail": f"Failed to rename passkey: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) @extend_schema( operation_id="get_login_passkey_options", summary="Get WebAuthn options for MFA login", description="Returns passkey auth options using MFA token (unauthenticated).", request={ "application/json": { "type": "object", "properties": { "mfa_token": {"type": "string", "description": "MFA token from login"}, }, "required": ["mfa_token"], } }, responses={ 200: {"description": "WebAuthn authentication options"}, 400: {"description": "Invalid or expired MFA token"}, }, tags=["Passkey"], ) @api_view(["POST"]) def get_login_passkey_options(request): """Get WebAuthn authentication options for MFA login flow (unauthenticated).""" from django.core.cache import cache from django.contrib.auth import get_user_model User = get_user_model() mfa_token = request.data.get("mfa_token") if not mfa_token: return Response( {"detail": "MFA token is required"}, status=status.HTTP_400_BAD_REQUEST ) cache_key = f"mfa_login:{mfa_token}" cached_data = cache.get(cache_key) if not cached_data: return Response( {"detail": "MFA session expired or invalid"}, status=status.HTTP_400_BAD_REQUEST, ) user_id = cached_data.get("user_id") try: user = User.objects.get(pk=user_id) except User.DoesNotExist: return Response({"detail": "User not found"}, status=status.HTTP_400_BAD_REQUEST) try: from allauth.mfa.models import Authenticator from allauth.mfa.webauthn.internal import auth as webauthn_auth passkeys = Authenticator.objects.filter( user=user, type=Authenticator.Type.WEBAUTHN ) if not passkeys.exists(): return Response( {"detail": "No passkeys registered"}, status=status.HTTP_400_BAD_REQUEST ) original_user = getattr(request, "user", None) request.user = user try: # begin_authentication takes just user, returns options (state stored internally) request_options = webauthn_auth.begin_authentication(user) # Note: State is managed by allauth's session context, but for MFA login flow # we need to track user separately since they're not authenticated yet passkey_state_key = f"mfa_passkey_state:{mfa_token}" # Store a reference that this user has a pending passkey auth cache.set(passkey_state_key, {"user_id": user_id}, timeout=300) return Response({"options": request_options}) finally: if original_user is not None: request.user = original_user except ImportError as e: logger.error(f"WebAuthn module import error: {e}") return Response( {"detail": "WebAuthn module not available"}, status=status.HTTP_400_BAD_REQUEST, ) except Exception as e: logger.error(f"Error getting login passkey options: {e}") return Response( {"detail": f"Failed to get passkey options: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, )