mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2026-02-05 16:55:17 -05:00
feat: add passkey authentication and enhance user preferences - Add passkey login security event type with fingerprint icon - Include request and site context in email confirmation for backend - Add user_id exact match filter to prevent incorrect user lookups - Enable PATCH method for updating user preferences via API - Add moderation_preferences support to user settings - Optimize ticket queries with select_related and prefetch_related This commit introduces passkey authentication tracking, improves user profile filtering accuracy, and extends the preferences API to support updates. Query optimizations reduce database hits for ticket listings.
1820 lines
68 KiB
Python
1820 lines
68 KiB
Python
"""
|
|
Auth domain views for ThrillWiki API v1.
|
|
|
|
This module contains all authentication-related API endpoints including
|
|
login, signup, logout, password management, social authentication,
|
|
user profiles, and top lists.
|
|
"""
|
|
|
|
import logging
|
|
|
|
from typing import cast # added 'cast'
|
|
|
|
from django.contrib.auth import authenticate, get_user_model, login, logout
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from django.core.exceptions import ValidationError
|
|
from django.db.models import Q
|
|
from django.http import HttpRequest # new import
|
|
from drf_spectacular.utils import extend_schema, extend_schema_view
|
|
from rest_framework import status
|
|
from rest_framework.permissions import AllowAny, IsAuthenticated
|
|
from rest_framework.request import Request
|
|
from rest_framework.response import Response
|
|
from rest_framework.views import APIView
|
|
|
|
from apps.accounts.services.social_provider_service import SocialProviderService
|
|
from apps.core.utils import capture_and_log
|
|
|
|
# Import directly from the auth serializers.py file (not the serializers package)
|
|
from .serializers import (
|
|
AuthStatusOutputSerializer,
|
|
# Authentication serializers
|
|
LoginInputSerializer,
|
|
LoginOutputSerializer,
|
|
LogoutOutputSerializer,
|
|
PasswordChangeInputSerializer,
|
|
PasswordChangeOutputSerializer,
|
|
PasswordResetInputSerializer,
|
|
PasswordResetOutputSerializer,
|
|
SignupInputSerializer,
|
|
SignupOutputSerializer,
|
|
SocialProviderOutputSerializer,
|
|
UserOutputSerializer,
|
|
)
|
|
from .serializers_package.social import (
|
|
AvailableProviderSerializer,
|
|
ConnectedProviderSerializer,
|
|
ConnectProviderInputSerializer,
|
|
ConnectProviderOutputSerializer,
|
|
DisconnectProviderOutputSerializer,
|
|
SocialAuthStatusSerializer,
|
|
SocialProviderErrorSerializer,
|
|
)
|
|
|
|
# Handle optional dependencies with fallback classes
|
|
|
|
|
|
class FallbackTurnstileMixin:
|
|
"""Fallback mixin if TurnstileMixin is not available."""
|
|
|
|
def validate_turnstile(self, request):
|
|
pass
|
|
|
|
|
|
# Try to import the real class, use fallback if not available and ensure it's a class/type
|
|
try:
|
|
from apps.accounts.mixins import TurnstileMixin as _ImportedTurnstileMixin
|
|
|
|
# Ensure the imported object is a class/type that can be used as a base class.
|
|
# If it's not a type for any reason, fall back to the safe mixin.
|
|
TurnstileMixin = _ImportedTurnstileMixin if isinstance(_ImportedTurnstileMixin, type) else FallbackTurnstileMixin
|
|
except Exception:
|
|
# Catch any import errors or unexpected exceptions and use the fallback mixin.
|
|
TurnstileMixin = FallbackTurnstileMixin
|
|
|
|
UserModel = get_user_model()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Helper: safely obtain underlying HttpRequest (used by Django auth)
|
|
|
|
|
|
def _get_underlying_request(request: Request) -> HttpRequest:
|
|
"""
|
|
Return a django HttpRequest for use with Django auth and site utilities.
|
|
|
|
DRF's Request wraps the underlying HttpRequest in ._request; cast() tells the
|
|
typechecker that the returned object is indeed an HttpRequest.
|
|
"""
|
|
return cast(HttpRequest, getattr(request, "_request", request))
|
|
|
|
|
|
# Helper: encapsulate user lookup + authenticate to reduce complexity in view
|
|
def _authenticate_user_by_lookup(email_or_username: str, password: str, request: Request) -> UserModel | None:
|
|
"""
|
|
Try a single optimized query to find a user by email OR username then authenticate.
|
|
Returns authenticated user or None.
|
|
"""
|
|
try:
|
|
# Single query to find user by email OR username
|
|
if "@" in (email_or_username or ""):
|
|
user_obj = (
|
|
UserModel.objects.select_related()
|
|
.filter(Q(email=email_or_username) | Q(username=email_or_username))
|
|
.first()
|
|
)
|
|
else:
|
|
user_obj = (
|
|
UserModel.objects.select_related()
|
|
.filter(Q(username=email_or_username) | Q(email=email_or_username))
|
|
.first()
|
|
)
|
|
|
|
if user_obj:
|
|
username_val = getattr(user_obj, "username", None)
|
|
return authenticate(
|
|
# type: ignore[arg-type]
|
|
_get_underlying_request(request),
|
|
username=username_val,
|
|
password=password,
|
|
)
|
|
except Exception:
|
|
# Fallback to authenticate directly with provided identifier
|
|
return authenticate(
|
|
# type: ignore[arg-type]
|
|
_get_underlying_request(request),
|
|
username=email_or_username,
|
|
password=password,
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
# === AUTHENTICATION API VIEWS ===
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="User login",
|
|
description="Authenticate user with username/email and password.",
|
|
request=LoginInputSerializer,
|
|
responses={
|
|
200: LoginOutputSerializer,
|
|
400: "Bad Request",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class LoginAPIView(APIView):
|
|
"""API endpoint for user login."""
|
|
|
|
permission_classes = [AllowAny]
|
|
authentication_classes = []
|
|
serializer_class = LoginInputSerializer
|
|
|
|
def post(self, request: Request) -> Response:
|
|
try:
|
|
# instantiate mixin before calling to avoid type-mismatch in static analysis
|
|
TurnstileMixin().validate_turnstile(request)
|
|
except ValidationError as e:
|
|
return Response({"detail": str(e)}, status=status.HTTP_400_BAD_REQUEST)
|
|
except Exception:
|
|
# If mixin doesn't do anything, continue
|
|
pass
|
|
|
|
serializer = LoginInputSerializer(data=request.data)
|
|
if serializer.is_valid():
|
|
validated = serializer.validated_data
|
|
# Use .get to satisfy static analyzers
|
|
email_or_username = validated.get("username") # type: ignore[assignment]
|
|
password = validated.get("password") # type: ignore[assignment]
|
|
|
|
if not email_or_username or not password:
|
|
return Response(
|
|
{"detail": "username and password are required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
user = _authenticate_user_by_lookup(email_or_username, password, request)
|
|
|
|
if user:
|
|
if getattr(user, "is_active", False):
|
|
# Check if user has MFA enabled
|
|
mfa_info = self._check_user_mfa(user)
|
|
|
|
if mfa_info["has_mfa"]:
|
|
# MFA required - generate temp token and return mfa_required response
|
|
from django.utils.crypto import get_random_string
|
|
from django.core.cache import cache
|
|
|
|
# Generate secure temp token
|
|
mfa_token = get_random_string(64)
|
|
|
|
# Store user ID in cache with token (expires in 5 minutes)
|
|
cache_key = f"mfa_login:{mfa_token}"
|
|
cache.set(cache_key, {
|
|
"user_id": user.pk,
|
|
"username": user.username,
|
|
}, timeout=300) # 5 minutes
|
|
|
|
from .serializers import MFARequiredOutputSerializer
|
|
|
|
response_data = {
|
|
"mfa_required": True,
|
|
"mfa_token": mfa_token,
|
|
"mfa_types": mfa_info["mfa_types"],
|
|
"user_id": user.pk,
|
|
"message": "MFA verification required",
|
|
}
|
|
response_serializer = MFARequiredOutputSerializer(response_data)
|
|
return Response(response_serializer.data)
|
|
|
|
# No MFA - proceed with normal login
|
|
# pass a real HttpRequest to Django login with backend specified
|
|
login(_get_underlying_request(request), user, backend="django.contrib.auth.backends.ModelBackend")
|
|
|
|
# Generate JWT tokens with auth method claims
|
|
from .jwt import create_tokens_for_user
|
|
|
|
tokens = create_tokens_for_user(
|
|
user,
|
|
auth_method="password",
|
|
mfa_verified=False,
|
|
provider_mfa=False,
|
|
)
|
|
|
|
# Log successful login
|
|
from apps.accounts.services.security_service import log_security_event
|
|
log_security_event(
|
|
"login_success",
|
|
request,
|
|
user=user,
|
|
metadata={"auth_method": "password", "mfa_required": False},
|
|
)
|
|
|
|
response_serializer = LoginOutputSerializer(
|
|
{
|
|
"access": tokens["access"],
|
|
"refresh": tokens["refresh"],
|
|
"user": user,
|
|
"message": "Login successful",
|
|
}
|
|
)
|
|
return Response(response_serializer.data)
|
|
else:
|
|
return Response(
|
|
{
|
|
"detail": "Please verify your email address before logging in. Check your email for a verification link.",
|
|
"code": "EMAIL_VERIFICATION_REQUIRED",
|
|
"email_verification_required": True,
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
else:
|
|
# Log failed login attempt
|
|
from apps.accounts.services.security_service import log_security_event
|
|
log_security_event(
|
|
"login_failed",
|
|
request,
|
|
user=None,
|
|
metadata={"username_attempted": email_or_username},
|
|
)
|
|
return Response(
|
|
{"detail": "Invalid credentials"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
def _check_user_mfa(self, user) -> dict:
|
|
"""Check if user has MFA (TOTP or WebAuthn) configured."""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
|
|
authenticators = Authenticator.objects.filter(user=user)
|
|
|
|
has_totp = authenticators.filter(type=Authenticator.Type.TOTP).exists()
|
|
has_webauthn = authenticators.filter(type=Authenticator.Type.WEBAUTHN).exists()
|
|
|
|
mfa_types = []
|
|
if has_totp:
|
|
mfa_types.append("totp")
|
|
if has_webauthn:
|
|
mfa_types.append("webauthn")
|
|
|
|
return {
|
|
"has_mfa": has_totp or has_webauthn,
|
|
"has_totp": has_totp,
|
|
"has_webauthn": has_webauthn,
|
|
"mfa_types": mfa_types,
|
|
}
|
|
except ImportError:
|
|
return {"has_mfa": False, "has_totp": False, "has_webauthn": False, "mfa_types": []}
|
|
except Exception:
|
|
return {"has_mfa": False, "has_totp": False, "has_webauthn": False, "mfa_types": []}
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Verify MFA for login",
|
|
description="Complete MFA verification after password authentication. Submit TOTP code to receive JWT tokens.",
|
|
request={"application/json": {
|
|
"type": "object",
|
|
"properties": {
|
|
"mfa_token": {"type": "string", "description": "Temporary token from login response"},
|
|
"code": {"type": "string", "description": "6-digit TOTP code"},
|
|
},
|
|
"required": ["mfa_token", "code"],
|
|
}},
|
|
responses={
|
|
200: LoginOutputSerializer,
|
|
400: "Bad Request - Invalid code or expired token",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class MFALoginVerifyAPIView(APIView):
|
|
"""API endpoint to verify MFA code and complete login."""
|
|
|
|
permission_classes = [AllowAny]
|
|
authentication_classes = []
|
|
|
|
def post(self, request: Request) -> Response:
|
|
from django.core.cache import cache
|
|
from .serializers import MFALoginVerifyInputSerializer
|
|
|
|
serializer = MFALoginVerifyInputSerializer(data=request.data)
|
|
if not serializer.is_valid():
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
validated = serializer.validated_data
|
|
mfa_token = validated.get("mfa_token")
|
|
totp_code = validated.get("code")
|
|
credential = validated.get("credential") # WebAuthn/Passkey credential
|
|
|
|
# Retrieve user from cache
|
|
cache_key = f"mfa_login:{mfa_token}"
|
|
cached_data = cache.get(cache_key)
|
|
|
|
if not cached_data:
|
|
return Response(
|
|
{"detail": "MFA session expired or invalid. Please login again."},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
user_id = cached_data.get("user_id")
|
|
|
|
try:
|
|
user = UserModel.objects.get(pk=user_id)
|
|
except UserModel.DoesNotExist:
|
|
return Response(
|
|
{"detail": "User not found"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Verify MFA - either TOTP or Passkey
|
|
from apps.accounts.services.security_service import log_security_event
|
|
|
|
if totp_code:
|
|
if not self._verify_totp(user, totp_code):
|
|
# Log failed MFA attempt
|
|
log_security_event(
|
|
"mfa_challenge_failed",
|
|
request,
|
|
user=user,
|
|
metadata={"method": "totp"},
|
|
)
|
|
return Response(
|
|
{"detail": "Invalid verification code"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
elif credential:
|
|
# Verify passkey/WebAuthn credential
|
|
passkey_result = self._verify_passkey(request, user, credential)
|
|
if not passkey_result["success"]:
|
|
# Log failed MFA attempt
|
|
log_security_event(
|
|
"mfa_challenge_failed",
|
|
request,
|
|
user=user,
|
|
metadata={"method": "passkey", "error": passkey_result.get("error")},
|
|
)
|
|
return Response(
|
|
{"detail": passkey_result.get("error", "Passkey verification failed")},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
else:
|
|
return Response(
|
|
{"detail": "Either TOTP code or passkey credential is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Clear the MFA token from cache
|
|
cache.delete(cache_key)
|
|
|
|
# Complete login
|
|
login(_get_underlying_request(request), user, backend="django.contrib.auth.backends.ModelBackend")
|
|
|
|
# Determine auth method based on what was verified
|
|
from .jwt import create_tokens_for_user
|
|
|
|
if credential:
|
|
# Passkey verification - inherently MFA
|
|
auth_method = "passkey"
|
|
else:
|
|
# TOTP verification
|
|
auth_method = "totp"
|
|
|
|
# Log successful MFA challenge and login
|
|
log_security_event(
|
|
"mfa_challenge_success",
|
|
request,
|
|
user=user,
|
|
metadata={"method": auth_method},
|
|
)
|
|
log_security_event(
|
|
"login_success",
|
|
request,
|
|
user=user,
|
|
metadata={"auth_method": auth_method, "mfa_verified": True},
|
|
)
|
|
|
|
tokens = create_tokens_for_user(
|
|
user,
|
|
auth_method=auth_method,
|
|
mfa_verified=True,
|
|
provider_mfa=False,
|
|
)
|
|
|
|
response_serializer = LoginOutputSerializer(
|
|
{
|
|
"access": tokens["access"],
|
|
"refresh": tokens["refresh"],
|
|
"user": user,
|
|
"message": "Login successful",
|
|
}
|
|
)
|
|
return Response(response_serializer.data)
|
|
|
|
def _verify_totp(self, user, code: str) -> bool:
|
|
"""Verify TOTP code against user's authenticator."""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
from allauth.mfa.totp.internal import auth as totp_auth
|
|
|
|
try:
|
|
authenticator = Authenticator.objects.get(
|
|
user=user,
|
|
type=Authenticator.Type.TOTP,
|
|
)
|
|
except Authenticator.DoesNotExist:
|
|
return False
|
|
|
|
# Get the secret from authenticator data and verify
|
|
secret = authenticator.data.get("secret")
|
|
if not secret:
|
|
return False
|
|
|
|
return totp_auth.validate_totp_code(secret, code)
|
|
|
|
except ImportError:
|
|
logger.error("allauth.mfa not available for TOTP verification")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"TOTP verification error: {e}")
|
|
return False
|
|
|
|
def _verify_passkey(self, request, user, credential: dict) -> dict:
|
|
"""Verify WebAuthn/Passkey credential."""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
from allauth.mfa.webauthn.internal import auth as webauthn_auth
|
|
|
|
# Check if user has any WebAuthn authenticators
|
|
has_passkey = Authenticator.objects.filter(
|
|
user=user,
|
|
type=Authenticator.Type.WEBAUTHN,
|
|
).exists()
|
|
|
|
if not has_passkey:
|
|
return {"success": False, "error": "No passkey registered for this user"}
|
|
|
|
try:
|
|
# For MFA login flow, we need to set up state first if not present
|
|
# Note: allauth's begin_authentication stores state internally
|
|
state = webauthn_auth.get_state()
|
|
|
|
if not state:
|
|
# Need to temporarily set request.user for allauth context
|
|
original_user = getattr(request, "user", None)
|
|
request.user = user
|
|
try:
|
|
webauthn_auth.begin_authentication(user)
|
|
finally:
|
|
if original_user is not None:
|
|
request.user = original_user
|
|
|
|
# Complete authentication - takes user and credential dict
|
|
# State is managed internally by allauth
|
|
webauthn_auth.complete_authentication(user, credential)
|
|
|
|
return {"success": True}
|
|
|
|
except Exception as e:
|
|
logger.error(f"WebAuthn authentication failed: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
except ImportError as e:
|
|
logger.error(f"WebAuthn module not available: {e}")
|
|
return {"success": False, "error": "Passkey authentication not available"}
|
|
except Exception as e:
|
|
logger.error(f"Passkey verification error: {e}")
|
|
return {"success": False, "error": "Passkey verification failed"}
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Exchange session for JWT tokens",
|
|
description="Exchange allauth session_token (from passkey login) for JWT tokens.",
|
|
responses={
|
|
200: LoginOutputSerializer,
|
|
401: "Not authenticated",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class SessionToTokenAPIView(APIView):
|
|
"""
|
|
API endpoint to exchange allauth session_token for JWT tokens.
|
|
|
|
Used after allauth headless passkey login to get JWT tokens for the frontend.
|
|
The allauth passkey login returns a session_token, and this endpoint
|
|
validates it and exchanges it for JWT tokens.
|
|
"""
|
|
|
|
# Allow unauthenticated - we validate the allauth session_token ourselves
|
|
permission_classes = [AllowAny]
|
|
authentication_classes = []
|
|
|
|
def post(self, request: Request) -> Response:
|
|
# Get the allauth session_token from header or body
|
|
session_token = request.headers.get('X-Session-Token') or request.data.get('session_token')
|
|
|
|
if not session_token:
|
|
return Response(
|
|
{"detail": "Session token required. Provide X-Session-Token header or session_token in body."},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Validate the session_token with allauth's session store
|
|
try:
|
|
from allauth.headless.tokens.strategies.sessions import SessionTokenStrategy
|
|
|
|
strategy = SessionTokenStrategy()
|
|
session_data = strategy.lookup_session(session_token)
|
|
|
|
if not session_data:
|
|
return Response(
|
|
{"detail": "Invalid or expired session token."},
|
|
status=status.HTTP_401_UNAUTHORIZED,
|
|
)
|
|
|
|
# Get user from the session
|
|
user_id = session_data.get('_auth_user_id')
|
|
if not user_id:
|
|
return Response(
|
|
{"detail": "No user found in session."},
|
|
status=status.HTTP_401_UNAUTHORIZED,
|
|
)
|
|
|
|
user = UserModel.objects.get(pk=user_id)
|
|
|
|
except (ImportError, Exception) as e:
|
|
logger.error(f"Failed to validate allauth session token: {e}")
|
|
return Response(
|
|
{"detail": "Failed to validate session token."},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
# Generate JWT tokens with passkey auth method
|
|
from .jwt import create_tokens_for_user
|
|
|
|
tokens = create_tokens_for_user(
|
|
user,
|
|
auth_method="passkey",
|
|
mfa_verified=True, # Passkey is considered MFA
|
|
provider_mfa=False,
|
|
)
|
|
|
|
# Log successful session-to-token exchange
|
|
from apps.accounts.services.security_service import log_security_event
|
|
log_security_event(
|
|
"session_to_token",
|
|
request,
|
|
user=user,
|
|
metadata={"auth_method": "passkey"},
|
|
)
|
|
|
|
response_serializer = LoginOutputSerializer(
|
|
{
|
|
"access": tokens["access"],
|
|
"refresh": tokens["refresh"],
|
|
"user": user,
|
|
"message": "Token exchange successful",
|
|
}
|
|
)
|
|
return Response(response_serializer.data)
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="User registration",
|
|
description="Register a new user account. Email verification required.",
|
|
request=SignupInputSerializer,
|
|
responses={
|
|
201: SignupOutputSerializer,
|
|
400: "Bad Request",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class SignupAPIView(APIView):
|
|
"""API endpoint for user registration."""
|
|
|
|
permission_classes = [AllowAny]
|
|
authentication_classes = []
|
|
serializer_class = SignupInputSerializer
|
|
|
|
def post(self, request: Request) -> Response:
|
|
try:
|
|
# instantiate mixin before calling to avoid type-mismatch in static analysis
|
|
TurnstileMixin().validate_turnstile(request)
|
|
except ValidationError as e:
|
|
return Response({"detail": str(e)}, status=status.HTTP_400_BAD_REQUEST)
|
|
except Exception:
|
|
# If mixin doesn't do anything, continue
|
|
pass
|
|
|
|
serializer = SignupInputSerializer(data=request.data, context={"request": request})
|
|
if serializer.is_valid():
|
|
user = serializer.save()
|
|
|
|
# Don't log in the user immediately - they need to verify their email first
|
|
response_serializer = SignupOutputSerializer(
|
|
{
|
|
"access": None,
|
|
"refresh": None,
|
|
"user": user,
|
|
"detail": "Registration successful. Please check your email to verify your account.",
|
|
"email_verification_required": True,
|
|
}
|
|
)
|
|
return Response(response_serializer.data, status=status.HTTP_201_CREATED)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="User logout",
|
|
description="Logout the current user and blacklist their refresh token.",
|
|
responses={
|
|
200: LogoutOutputSerializer,
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class LogoutAPIView(APIView):
|
|
"""API endpoint for user logout."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = LogoutOutputSerializer
|
|
|
|
def post(self, request: Request) -> Response:
|
|
try:
|
|
user = request.user
|
|
|
|
# Get refresh token from request data with proper type handling
|
|
refresh_token = None
|
|
if hasattr(request, "data") and request.data is not None:
|
|
data = getattr(request, "data", {})
|
|
if hasattr(data, "get"):
|
|
refresh_token = data.get("refresh")
|
|
|
|
if refresh_token and isinstance(refresh_token, str):
|
|
# Blacklist the refresh token
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
|
|
try:
|
|
# Create RefreshToken from string and blacklist it
|
|
refresh_token_obj = RefreshToken(refresh_token) # type: ignore[arg-type]
|
|
refresh_token_obj.blacklist()
|
|
except Exception:
|
|
# Token might be invalid or already blacklisted
|
|
pass
|
|
|
|
# Also delete the old token for backward compatibility
|
|
if hasattr(request.user, "auth_token"):
|
|
request.user.auth_token.delete()
|
|
|
|
# Log security event
|
|
from apps.accounts.services.security_service import log_security_event
|
|
log_security_event(
|
|
"logout",
|
|
request,
|
|
user=user,
|
|
metadata={},
|
|
)
|
|
|
|
# Logout from session using the underlying HttpRequest
|
|
logout(_get_underlying_request(request))
|
|
|
|
response_serializer = LogoutOutputSerializer({"detail": "Logout successful"})
|
|
return Response(response_serializer.data)
|
|
except Exception:
|
|
return Response({"detail": "Logout failed"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
@extend_schema_view(
|
|
get=extend_schema(
|
|
summary="Get current user",
|
|
description="Retrieve information about the currently authenticated user.",
|
|
responses={
|
|
200: UserOutputSerializer,
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class CurrentUserAPIView(APIView):
|
|
"""API endpoint to get current user information."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = UserOutputSerializer
|
|
|
|
def get(self, request: Request) -> Response:
|
|
serializer = UserOutputSerializer(request.user)
|
|
return Response(serializer.data)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Request password reset",
|
|
description="Send a password reset email to the user.",
|
|
request=PasswordResetInputSerializer,
|
|
responses={
|
|
200: PasswordResetOutputSerializer,
|
|
400: "Bad Request",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class PasswordResetAPIView(APIView):
|
|
"""API endpoint to request password reset."""
|
|
|
|
permission_classes = [AllowAny]
|
|
serializer_class = PasswordResetInputSerializer
|
|
|
|
def post(self, request: Request) -> Response:
|
|
serializer = PasswordResetInputSerializer(data=request.data, context={"request": request})
|
|
if serializer.is_valid():
|
|
serializer.save()
|
|
|
|
response_serializer = PasswordResetOutputSerializer({"detail": "Password reset email sent"})
|
|
return Response(response_serializer.data)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Change password",
|
|
description="Change the current user's password.",
|
|
request=PasswordChangeInputSerializer,
|
|
responses={
|
|
200: PasswordChangeOutputSerializer,
|
|
400: "Bad Request",
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class PasswordChangeAPIView(APIView):
|
|
"""API endpoint to change password."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = PasswordChangeInputSerializer
|
|
|
|
def post(self, request: Request) -> Response:
|
|
serializer = PasswordChangeInputSerializer(data=request.data, context={"request": request})
|
|
if serializer.is_valid():
|
|
serializer.save()
|
|
|
|
response_serializer = PasswordChangeOutputSerializer({"detail": "Password changed successfully"})
|
|
return Response(response_serializer.data)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@extend_schema_view(
|
|
get=extend_schema(
|
|
summary="Get social providers",
|
|
description="Retrieve available social authentication providers.",
|
|
responses={200: "List of social providers"},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class SocialProvidersAPIView(APIView):
|
|
"""API endpoint to get available social authentication providers."""
|
|
|
|
permission_classes = [AllowAny]
|
|
serializer_class = SocialProviderOutputSerializer
|
|
|
|
def get(self, request: Request) -> Response:
|
|
from django.core.cache import cache
|
|
|
|
# get_current_site expects a django HttpRequest; _get_underlying_request now returns HttpRequest
|
|
site = get_current_site(_get_underlying_request(request))
|
|
|
|
# Cache key based on site and request host - use getattr to avoid attribute errors
|
|
site_id = getattr(site, "id", getattr(site, "pk", None))
|
|
cache_key = f"social_providers:{site_id}:{request.get_host()}"
|
|
|
|
# Try to get from cache first (cache for 15 minutes)
|
|
cached_providers = cache.get(cache_key)
|
|
if cached_providers is not None:
|
|
return Response(cached_providers)
|
|
|
|
providers_list = []
|
|
|
|
# Optimized query: filter by site and order by provider name
|
|
from allauth.socialaccount.models import SocialApp
|
|
|
|
social_apps = SocialApp.objects.filter(sites=site).order_by("provider")
|
|
|
|
for social_app in social_apps:
|
|
try:
|
|
provider_name = social_app.name or getattr(social_app, "provider", "").title()
|
|
|
|
auth_url = request.build_absolute_uri(f"/accounts/{social_app.provider}/login/")
|
|
|
|
providers_list.append(
|
|
{
|
|
"id": social_app.provider,
|
|
"name": provider_name,
|
|
"authUrl": auth_url,
|
|
}
|
|
)
|
|
|
|
except Exception:
|
|
continue
|
|
|
|
serializer = SocialProviderOutputSerializer(providers_list, many=True)
|
|
response_data = serializer.data
|
|
|
|
cache.set(cache_key, response_data, 900)
|
|
|
|
return Response(response_data)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Check authentication status",
|
|
description="Check if user is authenticated and return user data.",
|
|
responses={200: AuthStatusOutputSerializer},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class AuthStatusAPIView(APIView):
|
|
"""API endpoint to check authentication status."""
|
|
|
|
permission_classes = [AllowAny]
|
|
serializer_class = AuthStatusOutputSerializer
|
|
|
|
def post(self, request: Request) -> Response:
|
|
if request.user.is_authenticated:
|
|
response_data = {
|
|
"authenticated": True,
|
|
"user": request.user,
|
|
}
|
|
else:
|
|
response_data = {
|
|
"authenticated": False,
|
|
"user": None,
|
|
}
|
|
|
|
serializer = AuthStatusOutputSerializer(response_data)
|
|
return Response(serializer.data)
|
|
|
|
|
|
# === SOCIAL PROVIDER MANAGEMENT API VIEWS ===
|
|
|
|
|
|
@extend_schema_view(
|
|
get=extend_schema(
|
|
summary="Get available social providers",
|
|
description="Retrieve list of available social authentication providers.",
|
|
responses={
|
|
200: AvailableProviderSerializer(many=True),
|
|
},
|
|
tags=["Social Authentication"],
|
|
),
|
|
)
|
|
class AvailableProvidersAPIView(APIView):
|
|
"""API endpoint to get available social providers."""
|
|
|
|
permission_classes = [AllowAny]
|
|
serializer_class = AvailableProviderSerializer
|
|
|
|
def get(self, request: Request) -> Response:
|
|
providers = [
|
|
{
|
|
"provider": "google",
|
|
"name": "Google",
|
|
"login_url": "/auth/social/google/",
|
|
"connect_url": "/auth/social/connect/google/",
|
|
},
|
|
{
|
|
"provider": "discord",
|
|
"name": "Discord",
|
|
"login_url": "/auth/social/discord/",
|
|
"connect_url": "/auth/social/connect/discord/",
|
|
},
|
|
]
|
|
|
|
serializer = AvailableProviderSerializer(providers, many=True)
|
|
return Response(serializer.data)
|
|
|
|
|
|
@extend_schema_view(
|
|
get=extend_schema(
|
|
summary="Get connected social providers",
|
|
description="Retrieve list of social providers connected to the user's account.",
|
|
responses={
|
|
200: ConnectedProviderSerializer(many=True),
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Social Authentication"],
|
|
),
|
|
)
|
|
class ConnectedProvidersAPIView(APIView):
|
|
"""API endpoint to get user's connected social providers."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = ConnectedProviderSerializer
|
|
|
|
def get(self, request: Request) -> Response:
|
|
service = SocialProviderService()
|
|
providers = service.get_connected_providers(request.user)
|
|
|
|
serializer = ConnectedProviderSerializer(providers, many=True)
|
|
return Response(serializer.data)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Connect social provider",
|
|
description="Connect a social authentication provider to the user's account.",
|
|
request=ConnectProviderInputSerializer,
|
|
responses={
|
|
200: ConnectProviderOutputSerializer,
|
|
400: SocialProviderErrorSerializer,
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Social Authentication"],
|
|
),
|
|
)
|
|
class ConnectProviderAPIView(APIView):
|
|
"""API endpoint to connect a social provider."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = ConnectProviderInputSerializer
|
|
|
|
def post(self, request: Request, provider: str) -> Response:
|
|
from apps.accounts.services.security_service import (
|
|
log_security_event,
|
|
send_security_notification,
|
|
)
|
|
|
|
# Validate provider
|
|
if provider not in ["google", "discord"]:
|
|
return Response(
|
|
{
|
|
"detail": f"Provider '{provider}' is not supported",
|
|
"code": "INVALID_PROVIDER",
|
|
"suggestions": ["Use 'google' or 'discord'"],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Check if user's email is verified before allowing social account linking
|
|
# This prevents attackers from linking a social account to an unverified email
|
|
user = request.user
|
|
|
|
# Check allauth email verification status
|
|
try:
|
|
from allauth.account.models import EmailAddress
|
|
primary_email = EmailAddress.objects.filter(user=user, primary=True).first()
|
|
if primary_email and not primary_email.verified:
|
|
return Response(
|
|
{
|
|
"detail": "Please verify your email address before connecting social accounts",
|
|
"code": "EMAIL_NOT_VERIFIED",
|
|
"suggestions": [
|
|
"Check your email for a verification link",
|
|
"Request a new verification email from your account settings",
|
|
],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except ImportError:
|
|
# If allauth.account is not available, skip check
|
|
pass
|
|
|
|
serializer = ConnectProviderInputSerializer(data=request.data)
|
|
if not serializer.is_valid():
|
|
return Response(
|
|
{
|
|
"detail": "Invalid request data",
|
|
"code": "VALIDATION_ERROR",
|
|
"details": serializer.errors,
|
|
"suggestions": ["Provide a valid access_token"],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
access_token = serializer.validated_data["access_token"]
|
|
|
|
try:
|
|
service = SocialProviderService()
|
|
result = service.connect_provider(request.user, provider, access_token)
|
|
|
|
# Log security event
|
|
log_security_event(
|
|
"social_linked",
|
|
request,
|
|
user=request.user,
|
|
metadata={"provider": provider},
|
|
)
|
|
|
|
# Send security notification
|
|
send_security_notification(request.user, "social_linked", {"provider": provider.title()})
|
|
|
|
response_serializer = ConnectProviderOutputSerializer(result)
|
|
return Response(response_serializer.data)
|
|
|
|
except Exception as e:
|
|
return Response(
|
|
{
|
|
"success": False,
|
|
"detail": "CONNECTION_FAILED",
|
|
"message": str(e),
|
|
"suggestions": [
|
|
"Verify the access token is valid",
|
|
"Ensure the provider account is not already connected to another user",
|
|
],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Disconnect social provider",
|
|
description="Disconnect a social authentication provider from the user's account.",
|
|
responses={
|
|
200: DisconnectProviderOutputSerializer,
|
|
400: SocialProviderErrorSerializer,
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Social Authentication"],
|
|
),
|
|
)
|
|
class DisconnectProviderAPIView(APIView):
|
|
"""API endpoint to disconnect a social provider."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = DisconnectProviderOutputSerializer
|
|
|
|
def post(self, request: Request, provider: str) -> Response:
|
|
# Validate provider
|
|
if provider not in ["google", "discord"]:
|
|
return Response(
|
|
{
|
|
"detail": f"Provider '{provider}' is not supported",
|
|
"code": "INVALID_PROVIDER",
|
|
"suggestions": ["Use 'google' or 'discord'"],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
try:
|
|
from apps.accounts.services.security_service import (
|
|
log_security_event,
|
|
send_security_notification,
|
|
)
|
|
|
|
service = SocialProviderService()
|
|
|
|
# Check if disconnection is safe
|
|
can_disconnect, reason = service.can_disconnect_provider(request.user, provider)
|
|
if not can_disconnect:
|
|
return Response(
|
|
{
|
|
"success": False,
|
|
"detail": "UNSAFE_DISCONNECTION",
|
|
"message": reason,
|
|
"suggestions": [
|
|
"Set up email/password authentication before disconnecting",
|
|
"Connect another social provider before disconnecting this one",
|
|
],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Perform disconnection
|
|
result = service.disconnect_provider(request.user, provider)
|
|
|
|
# Log security event
|
|
log_security_event(
|
|
"social_unlinked",
|
|
request,
|
|
user=request.user,
|
|
metadata={"provider": provider},
|
|
)
|
|
|
|
# Send security notification
|
|
send_security_notification(request.user, "social_unlinked", {"provider": provider.title()})
|
|
|
|
response_serializer = DisconnectProviderOutputSerializer(result)
|
|
return Response(response_serializer.data)
|
|
|
|
except Exception as e:
|
|
return Response(
|
|
{
|
|
"success": False,
|
|
"detail": "DISCONNECTION_FAILED",
|
|
"message": str(e),
|
|
"suggestions": [
|
|
"Verify the provider is currently connected",
|
|
"Ensure you have alternative authentication methods",
|
|
],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
|
|
@extend_schema_view(
|
|
get=extend_schema(
|
|
summary="Get social authentication status",
|
|
description="Get comprehensive social authentication status for the user.",
|
|
responses={
|
|
200: SocialAuthStatusSerializer,
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Social Authentication"],
|
|
),
|
|
)
|
|
class SocialAuthStatusAPIView(APIView):
|
|
"""API endpoint to get social authentication status."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = SocialAuthStatusSerializer
|
|
|
|
def get(self, request: Request) -> Response:
|
|
service = SocialProviderService()
|
|
auth_status = service.get_auth_status(request.user)
|
|
|
|
serializer = SocialAuthStatusSerializer(auth_status)
|
|
return Response(serializer.data)
|
|
|
|
|
|
# === EMAIL VERIFICATION API VIEWS ===
|
|
|
|
|
|
@extend_schema_view(
|
|
get=extend_schema(
|
|
summary="Verify email address",
|
|
description="Verify user's email address using verification token.",
|
|
responses={
|
|
200: {"type": "object", "properties": {"message": {"type": "string"}}},
|
|
400: "Bad Request",
|
|
404: "Token not found",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class EmailVerificationAPIView(APIView):
|
|
"""API endpoint for email verification."""
|
|
|
|
permission_classes = [AllowAny]
|
|
authentication_classes = []
|
|
|
|
def get(self, request: Request, token: str) -> Response:
|
|
from apps.accounts.models import EmailVerification
|
|
|
|
try:
|
|
verification = EmailVerification.objects.select_related("user").get(token=token)
|
|
user = verification.user
|
|
|
|
# Activate the user
|
|
user.is_active = True
|
|
user.save()
|
|
|
|
# Delete the verification record
|
|
verification.delete()
|
|
|
|
return Response({"detail": "Email verified successfully. You can now log in.", "success": True})
|
|
|
|
except EmailVerification.DoesNotExist:
|
|
return Response({"detail": "Invalid or expired verification token"}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Resend verification email",
|
|
description="Resend email verification to user's email address.",
|
|
request={"type": "object", "properties": {"email": {"type": "string", "format": "email"}}},
|
|
responses={
|
|
200: {"type": "object", "properties": {"message": {"type": "string"}}},
|
|
400: "Bad Request",
|
|
404: "User not found",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class ResendVerificationAPIView(APIView):
|
|
"""API endpoint to resend email verification."""
|
|
|
|
permission_classes = [AllowAny]
|
|
authentication_classes = []
|
|
|
|
def post(self, request: Request) -> Response:
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from django.utils.crypto import get_random_string
|
|
from django_forwardemail.services import EmailService
|
|
|
|
from apps.accounts.models import EmailVerification
|
|
|
|
email = request.data.get("email")
|
|
if not email:
|
|
return Response({"detail": "Email address is required"}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
try:
|
|
user = UserModel.objects.get(email__iexact=email.strip().lower())
|
|
|
|
# Don't resend if user is already active
|
|
if user.is_active:
|
|
return Response({"detail": "Email is already verified"}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
# Create or update verification record
|
|
verification, created = EmailVerification.objects.get_or_create(
|
|
user=user, defaults={"token": get_random_string(64)}
|
|
)
|
|
|
|
if not created:
|
|
# Update existing token and timestamp
|
|
verification.token = get_random_string(64)
|
|
verification.save()
|
|
|
|
# Send verification email
|
|
site = get_current_site(_get_underlying_request(request))
|
|
verification_url = request.build_absolute_uri(f"/api/v1/auth/verify-email/{verification.token}/")
|
|
|
|
try:
|
|
EmailService.send_email(
|
|
to=user.email,
|
|
subject="Verify your ThrillWiki account",
|
|
text=f"""
|
|
Welcome to ThrillWiki!
|
|
|
|
Please verify your email address by clicking the link below:
|
|
{verification_url}
|
|
|
|
If you didn't create an account, you can safely ignore this email.
|
|
|
|
Thanks,
|
|
The ThrillWiki Team
|
|
""".strip(),
|
|
site=site,
|
|
)
|
|
|
|
return Response({"detail": "Verification email sent successfully", "success": True})
|
|
|
|
except Exception as e:
|
|
capture_and_log(e, 'Send verification email', source='api')
|
|
|
|
return Response(
|
|
{"detail": "Failed to send verification email"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
|
|
)
|
|
|
|
except UserModel.DoesNotExist:
|
|
# Don't reveal whether email exists
|
|
return Response({"detail": "If the email exists, a verification email has been sent", "success": True})
|
|
|
|
# Note: User Profile, Top List, and Top List Item ViewSets are now handled
|
|
# by the dedicated accounts app at backend/apps/api/v1/accounts/views.py
|
|
# to avoid duplication and maintain clean separation of concerns.
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Process OAuth profile",
|
|
description="Process OAuth profile data during social authentication flow.",
|
|
request={
|
|
"type": "object",
|
|
"properties": {
|
|
"provider": {"type": "string", "description": "OAuth provider (e.g., google, discord)"},
|
|
"profile": {
|
|
"type": "object",
|
|
"description": "Profile data from OAuth provider",
|
|
"properties": {
|
|
"id": {"type": "string"},
|
|
"email": {"type": "string", "format": "email"},
|
|
"name": {"type": "string"},
|
|
"avatar_url": {"type": "string", "format": "uri"},
|
|
},
|
|
},
|
|
"access_token": {"type": "string", "description": "OAuth access token"},
|
|
},
|
|
"required": ["provider", "profile"],
|
|
},
|
|
responses={
|
|
200: {
|
|
"type": "object",
|
|
"properties": {
|
|
"success": {"type": "boolean"},
|
|
"action": {"type": "string", "enum": ["created", "updated", "linked"]},
|
|
"user": {"type": "object"},
|
|
"profile_synced": {"type": "boolean"},
|
|
},
|
|
},
|
|
400: "Bad Request",
|
|
401: "Unauthorized",
|
|
403: "Account suspended",
|
|
},
|
|
tags=["Social Authentication"],
|
|
),
|
|
)
|
|
class ProcessOAuthProfileAPIView(APIView):
|
|
"""
|
|
API endpoint to process OAuth profile data.
|
|
|
|
This endpoint is called AFTER the OAuth flow is complete to:
|
|
1. Check if user is banned (SECURITY CRITICAL)
|
|
2. Extract avatar from OAuth provider
|
|
3. Download and upload avatar to Cloudflare Images
|
|
4. Sync display name from OAuth provider
|
|
5. Update username if it's a generic UUID-based username
|
|
|
|
Called with an empty body - uses the authenticated session.
|
|
|
|
Full parity with Supabase Edge Function: process-oauth-profile
|
|
|
|
BULLETPROOFED: Comprehensive validation, sanitization, and error handling.
|
|
"""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
# Security constants
|
|
MAX_AVATAR_SIZE = 10 * 1024 * 1024 # 10MB
|
|
AVATAR_DOWNLOAD_TIMEOUT = 10.0 # seconds
|
|
AVATAR_UPLOAD_TIMEOUT = 30.0 # seconds
|
|
MAX_USERNAME_LENGTH = 150
|
|
MIN_USERNAME_LENGTH = 3
|
|
ALLOWED_USERNAME_CHARS = set("abcdefghijklmnopqrstuvwxyz0123456789_")
|
|
|
|
# Rate limiting for avatar uploads (prevent abuse)
|
|
AVATAR_UPLOAD_COOLDOWN = 60 # seconds between uploads
|
|
|
|
def post(self, request: Request) -> Response:
|
|
import re
|
|
import httpx
|
|
from django.db import transaction
|
|
from django.core.cache import cache
|
|
|
|
try:
|
|
user = request.user
|
|
|
|
# ================================================================
|
|
# STEP 0: Validate user object exists and is valid
|
|
# ================================================================
|
|
if not user or not hasattr(user, 'user_id'):
|
|
logger.error("ProcessOAuthProfile called with invalid user object")
|
|
return Response({
|
|
"success": False,
|
|
"error": "Invalid user session",
|
|
}, status=status.HTTP_401_UNAUTHORIZED)
|
|
|
|
user_id_str = str(user.user_id)
|
|
|
|
# ================================================================
|
|
# STEP 1: CRITICAL - Check ban status FIRST
|
|
# ================================================================
|
|
is_banned = getattr(user, 'is_banned', False)
|
|
|
|
# Also check via profile if applicable
|
|
if not is_banned:
|
|
try:
|
|
from apps.accounts.models import UserProfile
|
|
profile_check = UserProfile.objects.filter(user=user).first()
|
|
if profile_check and getattr(profile_check, 'is_banned', False):
|
|
is_banned = True
|
|
except Exception:
|
|
pass
|
|
|
|
if is_banned:
|
|
ban_reason = getattr(user, 'ban_reason', None) or "Policy violation"
|
|
# Sanitize ban reason for response
|
|
safe_ban_reason = str(ban_reason)[:200] if ban_reason else None
|
|
|
|
logger.warning(
|
|
f"Banned user attempted OAuth profile update",
|
|
extra={"user_id": user_id_str, "ban_reason": safe_ban_reason}
|
|
)
|
|
|
|
return Response({
|
|
"error": "Account suspended",
|
|
"message": (
|
|
f"Your account has been suspended. Reason: {safe_ban_reason}"
|
|
if safe_ban_reason
|
|
else "Your account has been suspended. Contact support for assistance."
|
|
),
|
|
"ban_reason": safe_ban_reason,
|
|
}, status=status.HTTP_403_FORBIDDEN)
|
|
|
|
# ================================================================
|
|
# STEP 2: Check rate limiting for avatar uploads
|
|
# ================================================================
|
|
rate_limit_key = f"oauth_profile:avatar:{user_id_str}"
|
|
if cache.get(rate_limit_key):
|
|
return Response({
|
|
"success": True,
|
|
"action": "rate_limited",
|
|
"message": "Please wait before updating your profile again",
|
|
"avatar_uploaded": False,
|
|
"profile_updated": False,
|
|
})
|
|
|
|
# ================================================================
|
|
# STEP 3: Get OAuth provider info from social accounts
|
|
# ================================================================
|
|
try:
|
|
from allauth.socialaccount.models import SocialAccount
|
|
except ImportError:
|
|
logger.error("django-allauth not installed")
|
|
return Response({
|
|
"success": False,
|
|
"error": "Social authentication not configured",
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
social_accounts = SocialAccount.objects.filter(user=user)
|
|
|
|
if not social_accounts.exists():
|
|
return Response({
|
|
"success": True,
|
|
"action": "skipped",
|
|
"message": "No OAuth accounts linked",
|
|
})
|
|
|
|
# Get the most recent social account
|
|
social_account = social_accounts.order_by("-date_joined").first()
|
|
if not social_account:
|
|
return Response({
|
|
"success": True,
|
|
"action": "skipped",
|
|
"message": "No valid OAuth account found",
|
|
})
|
|
|
|
provider = social_account.provider or "unknown"
|
|
extra_data = social_account.extra_data or {}
|
|
|
|
# Validate extra_data is a dict
|
|
if not isinstance(extra_data, dict):
|
|
logger.warning(f"Invalid extra_data type for user {user_id_str}: {type(extra_data)}")
|
|
extra_data = {}
|
|
|
|
# ================================================================
|
|
# STEP 4: Extract profile data based on provider (with sanitization)
|
|
# ================================================================
|
|
avatar_url = None
|
|
display_name = None
|
|
username_base = None
|
|
|
|
if provider == "google":
|
|
avatar_url = self._sanitize_url(extra_data.get("picture"))
|
|
display_name = self._sanitize_display_name(extra_data.get("name"))
|
|
email = extra_data.get("email", "")
|
|
if email and isinstance(email, str):
|
|
username_base = self._sanitize_username(email.split("@")[0])
|
|
|
|
elif provider == "discord":
|
|
discord_data = extra_data
|
|
discord_id = discord_data.get("id") or discord_data.get("sub")
|
|
|
|
display_name = self._sanitize_display_name(
|
|
discord_data.get("global_name")
|
|
or discord_data.get("full_name")
|
|
or discord_data.get("name")
|
|
)
|
|
|
|
# Discord avatar URL construction with validation
|
|
avatar_hash = discord_data.get("avatar")
|
|
if discord_id and avatar_hash and isinstance(discord_id, str) and isinstance(avatar_hash, str):
|
|
# Validate discord_id is numeric
|
|
if discord_id.isdigit():
|
|
# Validate avatar_hash is alphanumeric
|
|
if re.match(r'^[a-zA-Z0-9_]+$', avatar_hash):
|
|
avatar_url = f"https://cdn.discordapp.com/avatars/{discord_id}/{avatar_hash}.png?size=256"
|
|
|
|
if not avatar_url:
|
|
avatar_url = self._sanitize_url(
|
|
discord_data.get("avatar_url") or discord_data.get("picture")
|
|
)
|
|
|
|
raw_username = discord_data.get("username") or discord_data.get("name", "")
|
|
if raw_username and isinstance(raw_username, str):
|
|
username_base = self._sanitize_username(raw_username.split("#")[0])
|
|
if not username_base and discord_id:
|
|
username_base = f"discord_{str(discord_id)[:8]}"
|
|
|
|
else:
|
|
# Generic provider handling
|
|
avatar_url = self._sanitize_url(
|
|
extra_data.get("picture")
|
|
or extra_data.get("avatar_url")
|
|
or extra_data.get("avatar")
|
|
)
|
|
display_name = self._sanitize_display_name(
|
|
extra_data.get("name") or extra_data.get("display_name")
|
|
)
|
|
|
|
# ================================================================
|
|
# STEP 5: Get or create user profile (with transaction)
|
|
# ================================================================
|
|
from apps.accounts.models import UserProfile
|
|
|
|
with transaction.atomic():
|
|
profile, profile_created = UserProfile.objects.select_for_update().get_or_create(
|
|
user=user
|
|
)
|
|
|
|
# Check if profile already has an avatar
|
|
if profile.avatar_id:
|
|
return Response({
|
|
"success": True,
|
|
"action": "skipped",
|
|
"message": "Avatar already exists",
|
|
"avatar_uploaded": False,
|
|
"profile_updated": False,
|
|
})
|
|
|
|
# ================================================================
|
|
# STEP 6: Download and upload avatar to Cloudflare (outside transaction)
|
|
# ================================================================
|
|
avatar_uploaded = False
|
|
|
|
if avatar_url:
|
|
try:
|
|
# Validate URL scheme
|
|
if not avatar_url.startswith(('https://', 'http://')):
|
|
logger.warning(f"Invalid avatar URL scheme: {avatar_url[:50]}")
|
|
else:
|
|
# Download avatar from provider
|
|
download_response = httpx.get(
|
|
avatar_url,
|
|
timeout=self.AVATAR_DOWNLOAD_TIMEOUT,
|
|
follow_redirects=True,
|
|
headers={
|
|
"User-Agent": "ThrillWiki/1.0",
|
|
"Accept": "image/*",
|
|
},
|
|
)
|
|
|
|
if download_response.status_code == 200:
|
|
image_data = download_response.content
|
|
content_type = download_response.headers.get("content-type", "")
|
|
|
|
# Validate content type
|
|
if not content_type.startswith("image/"):
|
|
logger.warning(f"Invalid content type for avatar: {content_type}")
|
|
# Validate file size
|
|
elif len(image_data) > self.MAX_AVATAR_SIZE:
|
|
logger.warning(
|
|
f"Avatar too large for user {user_id_str}: {len(image_data)} bytes"
|
|
)
|
|
# Validate minimum size (avoid empty images)
|
|
elif len(image_data) < 100:
|
|
logger.warning(f"Avatar too small for user {user_id_str}")
|
|
else:
|
|
avatar_uploaded = self._upload_to_cloudflare(
|
|
image_data, user_id_str, provider, profile
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Avatar download failed: {download_response.status_code}",
|
|
extra={"user_id": user_id_str, "provider": provider}
|
|
)
|
|
|
|
except httpx.TimeoutException:
|
|
logger.warning(f"Avatar download timeout for user {user_id_str}")
|
|
except httpx.HTTPError as download_error:
|
|
logger.warning(f"Failed to download avatar: {download_error}")
|
|
except Exception as e:
|
|
logger.warning(f"Unexpected avatar error: {e}")
|
|
|
|
# Set rate limit after successful processing
|
|
if avatar_uploaded:
|
|
cache.set(rate_limit_key, True, self.AVATAR_UPLOAD_COOLDOWN)
|
|
|
|
# ================================================================
|
|
# STEP 7: Update display name if not set (with validation)
|
|
# ================================================================
|
|
profile_updated = False
|
|
|
|
if display_name and not getattr(user, "display_name", None):
|
|
try:
|
|
user.display_name = display_name
|
|
user.save(update_fields=["display_name"])
|
|
profile_updated = True
|
|
except Exception as e:
|
|
logger.warning(f"Failed to update display name: {e}")
|
|
|
|
# ================================================================
|
|
# STEP 8: Update username if it's a generic UUID-based username
|
|
# ================================================================
|
|
current_username = getattr(user, "username", "") or ""
|
|
if username_base and current_username.startswith("user_"):
|
|
try:
|
|
new_username = self._ensure_unique_username(username_base, user.user_id)
|
|
if new_username and new_username != current_username:
|
|
user.username = new_username
|
|
user.save(update_fields=["username"])
|
|
profile_updated = True
|
|
logger.info(
|
|
f"Username updated from {current_username} to {new_username}",
|
|
extra={"user_id": user_id_str}
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to update username: {e}")
|
|
|
|
return Response({
|
|
"success": True,
|
|
"action": "processed",
|
|
"provider": provider,
|
|
"avatar_uploaded": avatar_uploaded,
|
|
"profile_updated": profile_updated,
|
|
"message": "OAuth profile processed successfully",
|
|
})
|
|
|
|
except Exception as e:
|
|
capture_and_log(e, "Process OAuth profile", source="api", request=request)
|
|
return Response({
|
|
"success": False,
|
|
"error": "Failed to process OAuth profile",
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
def _sanitize_url(self, url) -> str | None:
|
|
"""Sanitize and validate URL."""
|
|
if not url or not isinstance(url, str):
|
|
return None
|
|
|
|
url = url.strip()[:2000] # Limit length
|
|
|
|
# Basic URL validation
|
|
if not url.startswith(('https://', 'http://')):
|
|
return None
|
|
|
|
# Block obviously malicious patterns
|
|
dangerous_patterns = ['javascript:', 'data:', 'file:', '<script', 'onclick']
|
|
for pattern in dangerous_patterns:
|
|
if pattern.lower() in url.lower():
|
|
return None
|
|
|
|
return url
|
|
|
|
def _sanitize_display_name(self, name) -> str | None:
|
|
"""Sanitize display name."""
|
|
if not name or not isinstance(name, str):
|
|
return None
|
|
|
|
import re
|
|
|
|
# Strip and limit length
|
|
name = name.strip()[:100]
|
|
|
|
# Remove control characters
|
|
name = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', name)
|
|
|
|
# Remove excessive whitespace
|
|
name = ' '.join(name.split())
|
|
|
|
# Must have at least 1 character
|
|
if len(name) < 1:
|
|
return None
|
|
|
|
return name
|
|
|
|
def _sanitize_username(self, username) -> str | None:
|
|
"""Sanitize username for use."""
|
|
if not username or not isinstance(username, str):
|
|
return None
|
|
|
|
import re
|
|
|
|
# Lowercase and remove non-allowed characters
|
|
username = username.lower().strip()
|
|
username = re.sub(r'[^a-z0-9_]', '', username)
|
|
|
|
# Enforce length limits
|
|
if len(username) < self.MIN_USERNAME_LENGTH:
|
|
return None
|
|
|
|
username = username[:self.MAX_USERNAME_LENGTH]
|
|
|
|
return username
|
|
|
|
def _upload_to_cloudflare(self, image_data: bytes, user_id: str, provider: str, profile) -> bool:
|
|
"""Upload image to Cloudflare Images with error handling."""
|
|
import httpx
|
|
from django.db import transaction
|
|
|
|
try:
|
|
from django_cloudflareimages_toolkit.models import CloudflareImage
|
|
from django_cloudflareimages_toolkit.services import CloudflareImagesService
|
|
|
|
cf_service = CloudflareImagesService()
|
|
|
|
# Request direct upload URL
|
|
upload_result = cf_service.get_direct_upload_url(
|
|
metadata={
|
|
"type": "avatar",
|
|
"user_id": user_id,
|
|
"provider": provider,
|
|
}
|
|
)
|
|
|
|
if not upload_result or "upload_url" not in upload_result:
|
|
logger.warning("Failed to get Cloudflare upload URL")
|
|
return False
|
|
|
|
upload_url = upload_result["upload_url"]
|
|
cloudflare_id = upload_result.get("id") or upload_result.get("cloudflare_id")
|
|
|
|
if not cloudflare_id:
|
|
logger.warning("No Cloudflare ID in upload result")
|
|
return False
|
|
|
|
# Upload image to Cloudflare
|
|
files = {"file": ("avatar.png", image_data, "image/png")}
|
|
upload_response = httpx.post(
|
|
upload_url,
|
|
files=files,
|
|
timeout=self.AVATAR_UPLOAD_TIMEOUT,
|
|
)
|
|
|
|
if upload_response.status_code not in [200, 201]:
|
|
logger.warning(f"Cloudflare upload failed: {upload_response.status_code}")
|
|
return False
|
|
|
|
# Create CloudflareImage record and link to profile
|
|
with transaction.atomic():
|
|
cf_image = CloudflareImage.objects.create(
|
|
cloudflare_id=cloudflare_id,
|
|
is_uploaded=True,
|
|
metadata={
|
|
"type": "avatar",
|
|
"user_id": user_id,
|
|
"provider": provider,
|
|
}
|
|
)
|
|
|
|
profile.avatar = cf_image
|
|
profile.save(update_fields=["avatar"])
|
|
|
|
logger.info(
|
|
f"Avatar uploaded successfully",
|
|
extra={"user_id": user_id, "provider": provider, "cloudflare_id": cloudflare_id}
|
|
)
|
|
return True
|
|
|
|
except ImportError:
|
|
logger.warning("django-cloudflareimages-toolkit not available")
|
|
return False
|
|
except Exception as cf_error:
|
|
logger.warning(f"Cloudflare upload error: {cf_error}")
|
|
return False
|
|
|
|
def _ensure_unique_username(self, base_username: str, user_id: str, max_attempts: int = 10) -> str | None:
|
|
"""
|
|
Ensure username is unique by appending numbers if needed.
|
|
|
|
Returns None if no valid username can be generated.
|
|
"""
|
|
if not base_username:
|
|
return None
|
|
|
|
username = base_username.lower()[:self.MAX_USERNAME_LENGTH]
|
|
|
|
# Validate characters
|
|
if not all(c in self.ALLOWED_USERNAME_CHARS for c in username):
|
|
return None
|
|
|
|
attempt = 0
|
|
|
|
while attempt < max_attempts:
|
|
try:
|
|
existing = UserModel.objects.filter(username=username).exclude(user_id=user_id).exists()
|
|
if not existing:
|
|
return username
|
|
except Exception:
|
|
break
|
|
|
|
attempt += 1
|
|
# Ensure we don't exceed max length with suffix
|
|
suffix = f"_{attempt}"
|
|
max_base = self.MAX_USERNAME_LENGTH - len(suffix)
|
|
username = f"{base_username.lower()[:max_base]}{suffix}"
|
|
|
|
# Fallback to UUID-based username
|
|
return f"user_{str(user_id)[:8]}"
|