mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2026-02-05 02:35:18 -05:00
feat: add security event taxonomy and optimize park queryset - Add comprehensive security_event_types ChoiceGroup with categories for authentication, MFA, password, account, session, and API key events - Include severity levels, icons, and CSS classes for each event type - Fix park queryset optimization by using select_related for OneToOne location relationship - Remove location property fields (latitude/longitude) from values() call as they are not actual DB columns - Add proper location fields (city, state, country) to values() for map display This change enhances security event tracking capabilities and resolves a queryset optimization issue where property decorators were incorrectly used in values() queries.
605 lines
21 KiB
Python
605 lines
21 KiB
Python
"""
|
|
Passkey (WebAuthn) API Views
|
|
|
|
Provides REST API endpoints for WebAuthn/Passkey operations using django-allauth's
|
|
mfa.webauthn module. Supports passkey registration, authentication, and management.
|
|
"""
|
|
|
|
import logging
|
|
|
|
from drf_spectacular.utils import extend_schema
|
|
from rest_framework import status
|
|
from rest_framework.decorators import api_view, permission_classes
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.response import Response
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="get_passkey_status",
|
|
summary="Get passkey status for current user",
|
|
description="Returns whether passkeys are enabled and lists registered passkeys.",
|
|
responses={
|
|
200: {
|
|
"description": "Passkey status",
|
|
"example": {
|
|
"passkey_enabled": True,
|
|
"passkeys": [
|
|
{"id": "abc123", "name": "MacBook Pro", "created_at": "2026-01-06T12:00:00Z"}
|
|
],
|
|
},
|
|
},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["GET"])
|
|
@permission_classes([IsAuthenticated])
|
|
def get_passkey_status(request):
|
|
"""Get passkey status for current user."""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
|
|
user = request.user
|
|
passkeys = Authenticator.objects.filter(
|
|
user=user, type=Authenticator.Type.WEBAUTHN
|
|
)
|
|
|
|
passkey_list = []
|
|
for pk in passkeys:
|
|
passkey_data = pk.data or {}
|
|
passkey_list.append({
|
|
"id": str(pk.id),
|
|
"name": passkey_data.get("name", "Passkey"),
|
|
"created_at": pk.created_at.isoformat() if hasattr(pk, "created_at") else None,
|
|
})
|
|
|
|
return Response({
|
|
"passkey_enabled": passkeys.exists(),
|
|
"passkey_count": passkeys.count(),
|
|
"passkeys": passkey_list,
|
|
})
|
|
except ImportError:
|
|
return Response({
|
|
"passkey_enabled": False,
|
|
"passkey_count": 0,
|
|
"passkeys": [],
|
|
"error": "WebAuthn module not available",
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error getting passkey status: {e}")
|
|
return Response(
|
|
{"detail": "Failed to get passkey status"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="get_registration_options",
|
|
summary="Get WebAuthn registration options",
|
|
description="Returns options for registering a new passkey. Start the registration flow.",
|
|
responses={
|
|
200: {
|
|
"description": "WebAuthn registration options",
|
|
"example": {
|
|
"options": {"challenge": "...", "rp": {"name": "ThrillWiki"}},
|
|
},
|
|
},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["GET"])
|
|
@permission_classes([IsAuthenticated])
|
|
def get_registration_options(request):
|
|
"""Get WebAuthn registration options for passkey setup."""
|
|
try:
|
|
from django.utils import timezone
|
|
from allauth.mfa.webauthn.internal import auth as webauthn_auth
|
|
|
|
# Use the correct allauth API: begin_registration
|
|
# The function takes (user, passwordless) - passwordless=False for standard passkeys
|
|
creation_options = webauthn_auth.begin_registration(request.user, passwordless=False)
|
|
|
|
# State is stored internally by begin_registration via set_state()
|
|
|
|
# Store registration timeout in session (5 minutes)
|
|
request.session["pending_passkey_expires"] = timezone.now().timestamp() + 300 # 5 minutes
|
|
|
|
# Debug log the structure
|
|
logger.debug(f"WebAuthn registration options type: {type(creation_options)}")
|
|
logger.debug(f"WebAuthn registration options keys: {creation_options.keys() if isinstance(creation_options, dict) else 'not a dict'}")
|
|
logger.info(f"WebAuthn registration options: {creation_options}")
|
|
|
|
return Response({
|
|
"options": creation_options,
|
|
"expires_in_seconds": 300,
|
|
})
|
|
except ImportError as e:
|
|
logger.error(f"WebAuthn module import error: {e}")
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error getting registration options: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to get registration options: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="register_passkey",
|
|
summary="Complete passkey registration",
|
|
description="Verifies the WebAuthn response and registers the new passkey.",
|
|
request={
|
|
"application/json": {
|
|
"type": "object",
|
|
"properties": {
|
|
"credential": {"type": "object", "description": "WebAuthn credential response"},
|
|
"name": {"type": "string", "description": "Name for this passkey"},
|
|
},
|
|
"required": ["credential"],
|
|
}
|
|
},
|
|
responses={
|
|
200: {"description": "Passkey registered successfully"},
|
|
400: {"description": "Invalid credential or registration failed"},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["POST"])
|
|
@permission_classes([IsAuthenticated])
|
|
def register_passkey(request):
|
|
"""Complete passkey registration with WebAuthn response."""
|
|
try:
|
|
from django.utils import timezone
|
|
from allauth.mfa.webauthn.internal import auth as webauthn_auth
|
|
|
|
from apps.accounts.services.security_service import (
|
|
log_security_event,
|
|
send_security_notification,
|
|
)
|
|
|
|
credential = request.data.get("credential")
|
|
name = request.data.get("name", "Passkey")
|
|
|
|
if not credential:
|
|
return Response(
|
|
{"detail": "Credential is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Check if registration has expired (5 minute timeout)
|
|
expires_at = request.session.get("pending_passkey_expires")
|
|
if expires_at and timezone.now().timestamp() > expires_at:
|
|
# Clear expired session data
|
|
if "pending_passkey_expires" in request.session:
|
|
del request.session["pending_passkey_expires"]
|
|
return Response(
|
|
{"detail": "Passkey registration session expired. Please start registration again."},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Get stored state from session (no request needed, uses context)
|
|
state = webauthn_auth.get_state()
|
|
if not state:
|
|
return Response(
|
|
{"detail": "No pending registration. Please start registration again."},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Use the correct allauth API: complete_registration
|
|
try:
|
|
from allauth.mfa.webauthn.internal.auth import WebAuthn
|
|
|
|
# Parse the credential response to validate it
|
|
credential_data = webauthn_auth.parse_registration_response(credential)
|
|
|
|
# Complete registration to validate and clear state
|
|
webauthn_auth.complete_registration(credential_data)
|
|
|
|
# Use allauth's WebAuthn.add() to create the Authenticator properly
|
|
# It stores the raw credential dict and name in the data field
|
|
webauthn_wrapper = WebAuthn.add(
|
|
request.user,
|
|
name,
|
|
credential, # Pass raw credential dict, not parsed data
|
|
)
|
|
authenticator = webauthn_wrapper.instance
|
|
|
|
# Log security event
|
|
log_security_event(
|
|
"passkey_registered",
|
|
request,
|
|
user=request.user,
|
|
metadata={"passkey_name": name, "passkey_id": str(authenticator.id) if authenticator else None},
|
|
)
|
|
|
|
# Send security notification email
|
|
send_security_notification(request.user, "passkey_registered", {"passkey_name": name})
|
|
|
|
return Response({
|
|
"detail": "Passkey registered successfully",
|
|
"name": name,
|
|
"id": str(authenticator.id) if authenticator else None,
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"WebAuthn registration failed: {e}")
|
|
return Response(
|
|
{"detail": f"Registration failed: {str(e)}"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except ImportError as e:
|
|
logger.error(f"WebAuthn module import error: {e}")
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error registering passkey: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to register passkey: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="get_authentication_options",
|
|
summary="Get WebAuthn authentication options",
|
|
description="Returns options for authenticating with a passkey.",
|
|
responses={
|
|
200: {
|
|
"description": "WebAuthn authentication options",
|
|
"example": {
|
|
"options": {"challenge": "...", "allowCredentials": []},
|
|
},
|
|
},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["GET"])
|
|
@permission_classes([IsAuthenticated])
|
|
def get_authentication_options(request):
|
|
"""Get WebAuthn authentication options for passkey verification."""
|
|
try:
|
|
from allauth.mfa.webauthn.internal import auth as webauthn_auth
|
|
|
|
# Use the correct allauth API: begin_authentication
|
|
# Takes optional user, returns just options (state is stored internally)
|
|
request_options = webauthn_auth.begin_authentication(request.user)
|
|
|
|
return Response({
|
|
"options": request_options,
|
|
})
|
|
except ImportError as e:
|
|
logger.error(f"WebAuthn module import error: {e}")
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error getting authentication options: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to get authentication options: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="authenticate_passkey",
|
|
summary="Authenticate with passkey",
|
|
description="Verifies the WebAuthn response for authentication.",
|
|
request={
|
|
"application/json": {
|
|
"type": "object",
|
|
"properties": {
|
|
"credential": {"type": "object", "description": "WebAuthn credential response"},
|
|
},
|
|
"required": ["credential"],
|
|
}
|
|
},
|
|
responses={
|
|
200: {"description": "Authentication successful"},
|
|
400: {"description": "Invalid credential or authentication failed"},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["POST"])
|
|
@permission_classes([IsAuthenticated])
|
|
def authenticate_passkey(request):
|
|
"""Verify passkey authentication."""
|
|
try:
|
|
from allauth.mfa.webauthn.internal import auth as webauthn_auth
|
|
|
|
credential = request.data.get("credential")
|
|
|
|
if not credential:
|
|
return Response(
|
|
{"detail": "Credential is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Get stored state from session (no request needed, uses context)
|
|
state = webauthn_auth.get_state()
|
|
if not state:
|
|
return Response(
|
|
{"detail": "No pending authentication. Please start authentication again."},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Use the correct allauth API: complete_authentication
|
|
try:
|
|
# Complete authentication - takes user and credential response
|
|
# State is handled internally
|
|
webauthn_auth.complete_authentication(request.user, credential)
|
|
|
|
return Response({"success": True})
|
|
except Exception as e:
|
|
logger.error(f"WebAuthn authentication failed: {e}")
|
|
return Response(
|
|
{"detail": f"Authentication failed: {str(e)}"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except ImportError as e:
|
|
logger.error(f"WebAuthn module import error: {e}")
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error authenticating passkey: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to authenticate: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="delete_passkey",
|
|
summary="Delete a passkey",
|
|
description="Removes a registered passkey from the user's account.",
|
|
request={
|
|
"application/json": {
|
|
"type": "object",
|
|
"properties": {
|
|
"password": {"type": "string", "description": "Current password for confirmation"},
|
|
},
|
|
"required": ["password"],
|
|
}
|
|
},
|
|
responses={
|
|
200: {"description": "Passkey deleted successfully"},
|
|
400: {"description": "Invalid password or passkey not found"},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["DELETE"])
|
|
@permission_classes([IsAuthenticated])
|
|
def delete_passkey(request, passkey_id):
|
|
"""Delete a passkey."""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
|
|
from apps.accounts.services.security_service import (
|
|
check_auth_method_availability,
|
|
log_security_event,
|
|
send_security_notification,
|
|
)
|
|
|
|
user = request.user
|
|
password = request.data.get("password", "")
|
|
|
|
# Verify password
|
|
if not user.check_password(password):
|
|
return Response(
|
|
{"detail": "Invalid password"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Check if user has other auth methods before removing passkey
|
|
auth_methods = check_auth_method_availability(user)
|
|
|
|
# If this is the last passkey and user has no other auth method, block removal
|
|
if auth_methods["passkey_count"] == 1:
|
|
if not auth_methods["has_password"] and not auth_methods["has_social"] and not auth_methods["has_totp"]:
|
|
return Response(
|
|
{"detail": "Cannot remove last passkey: you must have at least one authentication method. Please set a password or connect a social account first."},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Find and delete the passkey
|
|
try:
|
|
authenticator = Authenticator.objects.get(
|
|
id=passkey_id,
|
|
user=user,
|
|
type=Authenticator.Type.WEBAUTHN,
|
|
)
|
|
passkey_name = authenticator.data.get("name", "Passkey") if authenticator.data else "Passkey"
|
|
authenticator.delete()
|
|
|
|
# Log security event
|
|
log_security_event(
|
|
"passkey_removed",
|
|
request,
|
|
user=user,
|
|
metadata={"passkey_name": passkey_name, "passkey_id": str(passkey_id)},
|
|
)
|
|
|
|
# Send security notification email
|
|
send_security_notification(user, "passkey_removed", {"passkey_name": passkey_name})
|
|
|
|
except Authenticator.DoesNotExist:
|
|
return Response(
|
|
{"detail": "Passkey not found"},
|
|
status=status.HTTP_404_NOT_FOUND,
|
|
)
|
|
|
|
return Response({"detail": "Passkey deleted successfully"})
|
|
except ImportError:
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error deleting passkey: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to delete passkey: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="rename_passkey",
|
|
summary="Rename a passkey",
|
|
description="Updates the name of a registered passkey.",
|
|
request={
|
|
"application/json": {
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {"type": "string", "description": "New name for the passkey"},
|
|
},
|
|
"required": ["name"],
|
|
}
|
|
},
|
|
responses={
|
|
200: {"description": "Passkey renamed successfully"},
|
|
404: {"description": "Passkey not found"},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["PATCH"])
|
|
@permission_classes([IsAuthenticated])
|
|
def rename_passkey(request, passkey_id):
|
|
"""Rename a passkey."""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
|
|
user = request.user
|
|
new_name = request.data.get("name", "").strip()
|
|
|
|
if not new_name:
|
|
return Response(
|
|
{"detail": "Name is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
try:
|
|
authenticator = Authenticator.objects.get(
|
|
id=passkey_id, user=user, type=Authenticator.Type.WEBAUTHN,
|
|
)
|
|
data = authenticator.data or {}
|
|
data["name"] = new_name
|
|
authenticator.data = data
|
|
authenticator.save()
|
|
except Authenticator.DoesNotExist:
|
|
return Response(
|
|
{"detail": "Passkey not found"},
|
|
status=status.HTTP_404_NOT_FOUND,
|
|
)
|
|
|
|
return Response({"detail": "Passkey renamed successfully", "name": new_name})
|
|
except ImportError:
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error renaming passkey: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to rename passkey: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
|
|
@extend_schema(
|
|
operation_id="get_login_passkey_options",
|
|
summary="Get WebAuthn options for MFA login",
|
|
description="Returns passkey auth options using MFA token (unauthenticated).",
|
|
request={
|
|
"application/json": {
|
|
"type": "object",
|
|
"properties": {
|
|
"mfa_token": {"type": "string", "description": "MFA token from login"},
|
|
},
|
|
"required": ["mfa_token"],
|
|
}
|
|
},
|
|
responses={
|
|
200: {"description": "WebAuthn authentication options"},
|
|
400: {"description": "Invalid or expired MFA token"},
|
|
},
|
|
tags=["Passkey"],
|
|
)
|
|
@api_view(["POST"])
|
|
def get_login_passkey_options(request):
|
|
"""Get WebAuthn authentication options for MFA login flow (unauthenticated)."""
|
|
from django.core.cache import cache
|
|
from django.contrib.auth import get_user_model
|
|
|
|
User = get_user_model()
|
|
mfa_token = request.data.get("mfa_token")
|
|
|
|
if not mfa_token:
|
|
return Response(
|
|
{"detail": "MFA token is required"}, status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
cache_key = f"mfa_login:{mfa_token}"
|
|
cached_data = cache.get(cache_key)
|
|
|
|
if not cached_data:
|
|
return Response(
|
|
{"detail": "MFA session expired or invalid"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
user_id = cached_data.get("user_id")
|
|
|
|
try:
|
|
user = User.objects.get(pk=user_id)
|
|
except User.DoesNotExist:
|
|
return Response({"detail": "User not found"}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
from allauth.mfa.webauthn.internal import auth as webauthn_auth
|
|
|
|
passkeys = Authenticator.objects.filter(
|
|
user=user, type=Authenticator.Type.WEBAUTHN
|
|
)
|
|
|
|
if not passkeys.exists():
|
|
return Response(
|
|
{"detail": "No passkeys registered"}, status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
original_user = getattr(request, "user", None)
|
|
request.user = user
|
|
|
|
try:
|
|
# begin_authentication takes just user, returns options (state stored internally)
|
|
request_options = webauthn_auth.begin_authentication(user)
|
|
# Note: State is managed by allauth's session context, but for MFA login flow
|
|
# we need to track user separately since they're not authenticated yet
|
|
passkey_state_key = f"mfa_passkey_state:{mfa_token}"
|
|
# Store a reference that this user has a pending passkey auth
|
|
cache.set(passkey_state_key, {"user_id": user_id}, timeout=300)
|
|
return Response({"options": request_options})
|
|
finally:
|
|
if original_user is not None:
|
|
request.user = original_user
|
|
|
|
except ImportError as e:
|
|
logger.error(f"WebAuthn module import error: {e}")
|
|
return Response(
|
|
{"detail": "WebAuthn module not available"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error getting login passkey options: {e}")
|
|
return Response(
|
|
{"detail": f"Failed to get passkey options: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|