Files
thrillwiki_django_no_react/backend/apps/api/v1/auth/views.py

1598 lines
60 KiB
Python

"""
Auth domain views for ThrillWiki API v1.
This module contains all authentication-related API endpoints including
login, signup, logout, password management, social authentication,
user profiles, and top lists.
"""
import logging
from typing import cast # added 'cast'
from django.contrib.auth import authenticate, get_user_model, login, logout
from django.contrib.sites.shortcuts import get_current_site
from django.core.exceptions import ValidationError
from django.db.models import Q
from django.http import HttpRequest # new import
from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import status
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView
from apps.accounts.services.social_provider_service import SocialProviderService
from apps.core.utils import capture_and_log
# Import directly from the auth serializers.py file (not the serializers package)
from .serializers import (
AuthStatusOutputSerializer,
# Authentication serializers
LoginInputSerializer,
LoginOutputSerializer,
LogoutOutputSerializer,
PasswordChangeInputSerializer,
PasswordChangeOutputSerializer,
PasswordResetInputSerializer,
PasswordResetOutputSerializer,
SignupInputSerializer,
SignupOutputSerializer,
SocialProviderOutputSerializer,
UserOutputSerializer,
)
from .serializers_package.social import (
AvailableProviderSerializer,
ConnectedProviderSerializer,
ConnectProviderInputSerializer,
ConnectProviderOutputSerializer,
DisconnectProviderOutputSerializer,
SocialAuthStatusSerializer,
SocialProviderErrorSerializer,
)
# Handle optional dependencies with fallback classes
class FallbackTurnstileMixin:
"""Fallback mixin if TurnstileMixin is not available."""
def validate_turnstile(self, request):
pass
# Try to import the real class, use fallback if not available and ensure it's a class/type
try:
from apps.accounts.mixins import TurnstileMixin as _ImportedTurnstileMixin
# Ensure the imported object is a class/type that can be used as a base class.
# If it's not a type for any reason, fall back to the safe mixin.
TurnstileMixin = _ImportedTurnstileMixin if isinstance(_ImportedTurnstileMixin, type) else FallbackTurnstileMixin
except Exception:
# Catch any import errors or unexpected exceptions and use the fallback mixin.
TurnstileMixin = FallbackTurnstileMixin
UserModel = get_user_model()
logger = logging.getLogger(__name__)
# Helper: safely obtain underlying HttpRequest (used by Django auth)
def _get_underlying_request(request: Request) -> HttpRequest:
"""
Return a django HttpRequest for use with Django auth and site utilities.
DRF's Request wraps the underlying HttpRequest in ._request; cast() tells the
typechecker that the returned object is indeed an HttpRequest.
"""
return cast(HttpRequest, getattr(request, "_request", request))
# Helper: encapsulate user lookup + authenticate to reduce complexity in view
def _authenticate_user_by_lookup(email_or_username: str, password: str, request: Request) -> UserModel | None:
"""
Try a single optimized query to find a user by email OR username then authenticate.
Returns authenticated user or None.
"""
try:
# Single query to find user by email OR username
if "@" in (email_or_username or ""):
user_obj = (
UserModel.objects.select_related()
.filter(Q(email=email_or_username) | Q(username=email_or_username))
.first()
)
else:
user_obj = (
UserModel.objects.select_related()
.filter(Q(username=email_or_username) | Q(email=email_or_username))
.first()
)
if user_obj:
username_val = getattr(user_obj, "username", None)
return authenticate(
# type: ignore[arg-type]
_get_underlying_request(request),
username=username_val,
password=password,
)
except Exception:
# Fallback to authenticate directly with provided identifier
return authenticate(
# type: ignore[arg-type]
_get_underlying_request(request),
username=email_or_username,
password=password,
)
return None
# === AUTHENTICATION API VIEWS ===
@extend_schema_view(
post=extend_schema(
summary="User login",
description="Authenticate user with username/email and password.",
request=LoginInputSerializer,
responses={
200: LoginOutputSerializer,
400: "Bad Request",
},
tags=["Authentication"],
),
)
class LoginAPIView(APIView):
"""API endpoint for user login."""
permission_classes = [AllowAny]
authentication_classes = []
serializer_class = LoginInputSerializer
def post(self, request: Request) -> Response:
try:
# instantiate mixin before calling to avoid type-mismatch in static analysis
TurnstileMixin().validate_turnstile(request)
except ValidationError as e:
return Response({"detail": str(e)}, status=status.HTTP_400_BAD_REQUEST)
except Exception:
# If mixin doesn't do anything, continue
pass
serializer = LoginInputSerializer(data=request.data)
if serializer.is_valid():
validated = serializer.validated_data
# Use .get to satisfy static analyzers
email_or_username = validated.get("username") # type: ignore[assignment]
password = validated.get("password") # type: ignore[assignment]
if not email_or_username or not password:
return Response(
{"detail": "username and password are required"},
status=status.HTTP_400_BAD_REQUEST,
)
user = _authenticate_user_by_lookup(email_or_username, password, request)
if user:
if getattr(user, "is_active", False):
# Check if user has MFA enabled
mfa_info = self._check_user_mfa(user)
if mfa_info["has_mfa"]:
# MFA required - generate temp token and return mfa_required response
from django.utils.crypto import get_random_string
from django.core.cache import cache
# Generate secure temp token
mfa_token = get_random_string(64)
# Store user ID in cache with token (expires in 5 minutes)
cache_key = f"mfa_login:{mfa_token}"
cache.set(cache_key, {
"user_id": user.pk,
"username": user.username,
}, timeout=300) # 5 minutes
from .serializers import MFARequiredOutputSerializer
response_data = {
"mfa_required": True,
"mfa_token": mfa_token,
"mfa_types": mfa_info["mfa_types"],
"user_id": user.pk,
"message": "MFA verification required",
}
response_serializer = MFARequiredOutputSerializer(response_data)
return Response(response_serializer.data)
# No MFA - proceed with normal login
# pass a real HttpRequest to Django login with backend specified
login(_get_underlying_request(request), user, backend="django.contrib.auth.backends.ModelBackend")
# Generate JWT tokens
from rest_framework_simplejwt.tokens import RefreshToken
refresh = RefreshToken.for_user(user)
access_token = refresh.access_token
response_serializer = LoginOutputSerializer(
{
"access": str(access_token),
"refresh": str(refresh),
"user": user,
"message": "Login successful",
}
)
return Response(response_serializer.data)
else:
return Response(
{
"detail": "Please verify your email address before logging in. Check your email for a verification link.",
"code": "EMAIL_VERIFICATION_REQUIRED",
"email_verification_required": True,
},
status=status.HTTP_400_BAD_REQUEST,
)
else:
return Response(
{"detail": "Invalid credentials"},
status=status.HTTP_400_BAD_REQUEST,
)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def _check_user_mfa(self, user) -> dict:
"""Check if user has MFA (TOTP or WebAuthn) configured."""
try:
from allauth.mfa.models import Authenticator
authenticators = Authenticator.objects.filter(user=user)
has_totp = authenticators.filter(type=Authenticator.Type.TOTP).exists()
has_webauthn = authenticators.filter(type=Authenticator.Type.WEBAUTHN).exists()
mfa_types = []
if has_totp:
mfa_types.append("totp")
if has_webauthn:
mfa_types.append("webauthn")
return {
"has_mfa": has_totp or has_webauthn,
"has_totp": has_totp,
"has_webauthn": has_webauthn,
"mfa_types": mfa_types,
}
except ImportError:
return {"has_mfa": False, "has_totp": False, "has_webauthn": False, "mfa_types": []}
except Exception:
return {"has_mfa": False, "has_totp": False, "has_webauthn": False, "mfa_types": []}
@extend_schema_view(
post=extend_schema(
summary="Verify MFA for login",
description="Complete MFA verification after password authentication. Submit TOTP code to receive JWT tokens.",
request={"application/json": {
"type": "object",
"properties": {
"mfa_token": {"type": "string", "description": "Temporary token from login response"},
"code": {"type": "string", "description": "6-digit TOTP code"},
},
"required": ["mfa_token", "code"],
}},
responses={
200: LoginOutputSerializer,
400: "Bad Request - Invalid code or expired token",
},
tags=["Authentication"],
),
)
class MFALoginVerifyAPIView(APIView):
"""API endpoint to verify MFA code and complete login."""
permission_classes = [AllowAny]
authentication_classes = []
def post(self, request: Request) -> Response:
from django.core.cache import cache
from .serializers import MFALoginVerifyInputSerializer
serializer = MFALoginVerifyInputSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
validated = serializer.validated_data
mfa_token = validated.get("mfa_token")
totp_code = validated.get("code")
credential = validated.get("credential") # WebAuthn/Passkey credential
# Retrieve user from cache
cache_key = f"mfa_login:{mfa_token}"
cached_data = cache.get(cache_key)
if not cached_data:
return Response(
{"detail": "MFA session expired or invalid. Please login again."},
status=status.HTTP_400_BAD_REQUEST,
)
user_id = cached_data.get("user_id")
try:
user = UserModel.objects.get(pk=user_id)
except UserModel.DoesNotExist:
return Response(
{"detail": "User not found"},
status=status.HTTP_400_BAD_REQUEST,
)
# Verify MFA - either TOTP or Passkey
if totp_code:
if not self._verify_totp(user, totp_code):
return Response(
{"detail": "Invalid verification code"},
status=status.HTTP_400_BAD_REQUEST,
)
elif credential:
# Verify passkey/WebAuthn credential
passkey_result = self._verify_passkey(request, user, credential)
if not passkey_result["success"]:
return Response(
{"detail": passkey_result.get("error", "Passkey verification failed")},
status=status.HTTP_400_BAD_REQUEST,
)
else:
return Response(
{"detail": "Either TOTP code or passkey credential is required"},
status=status.HTTP_400_BAD_REQUEST,
)
# Clear the MFA token from cache
cache.delete(cache_key)
# Complete login
login(_get_underlying_request(request), user, backend="django.contrib.auth.backends.ModelBackend")
# Generate JWT tokens
from rest_framework_simplejwt.tokens import RefreshToken
refresh = RefreshToken.for_user(user)
access_token = refresh.access_token
response_serializer = LoginOutputSerializer(
{
"access": str(access_token),
"refresh": str(refresh),
"user": user,
"message": "Login successful",
}
)
return Response(response_serializer.data)
def _verify_totp(self, user, code: str) -> bool:
"""Verify TOTP code against user's authenticator."""
try:
from allauth.mfa.models import Authenticator
from allauth.mfa.totp.internal import auth as totp_auth
try:
authenticator = Authenticator.objects.get(
user=user,
type=Authenticator.Type.TOTP,
)
except Authenticator.DoesNotExist:
return False
# Get the secret from authenticator data and verify
secret = authenticator.data.get("secret")
if not secret:
return False
return totp_auth.validate_totp_code(secret, code)
except ImportError:
logger.error("allauth.mfa not available for TOTP verification")
return False
except Exception as e:
logger.error(f"TOTP verification error: {e}")
return False
def _verify_passkey(self, request, user, credential: dict) -> dict:
"""Verify WebAuthn/Passkey credential."""
try:
from allauth.mfa.models import Authenticator
from allauth.mfa.webauthn.internal import auth as webauthn_auth
# Check if user has any WebAuthn authenticators
has_passkey = Authenticator.objects.filter(
user=user,
type=Authenticator.Type.WEBAUTHN,
).exists()
if not has_passkey:
return {"success": False, "error": "No passkey registered for this user"}
try:
# Parse the authentication response
credential_data = webauthn_auth.parse_authentication_response(credential)
# Get or create authentication state
# For login flow, we need to set up the state first
state = webauthn_auth.get_state(request)
if not state:
# If no state, generate one for this user
_, state = webauthn_auth.begin_authentication(request)
webauthn_auth.set_state(request, state)
# Complete authentication
webauthn_auth.complete_authentication(request, credential_data, state)
# Clear the state
webauthn_auth.clear_state(request)
return {"success": True}
except Exception as e:
logger.error(f"WebAuthn authentication failed: {e}")
return {"success": False, "error": str(e)}
except ImportError as e:
logger.error(f"WebAuthn module not available: {e}")
return {"success": False, "error": "Passkey authentication not available"}
except Exception as e:
logger.error(f"Passkey verification error: {e}")
return {"success": False, "error": "Passkey verification failed"}
@extend_schema_view(
post=extend_schema(
summary="User registration",
description="Register a new user account. Email verification required.",
request=SignupInputSerializer,
responses={
201: SignupOutputSerializer,
400: "Bad Request",
},
tags=["Authentication"],
),
)
class SignupAPIView(APIView):
"""API endpoint for user registration."""
permission_classes = [AllowAny]
authentication_classes = []
serializer_class = SignupInputSerializer
def post(self, request: Request) -> Response:
try:
# instantiate mixin before calling to avoid type-mismatch in static analysis
TurnstileMixin().validate_turnstile(request)
except ValidationError as e:
return Response({"detail": str(e)}, status=status.HTTP_400_BAD_REQUEST)
except Exception:
# If mixin doesn't do anything, continue
pass
serializer = SignupInputSerializer(data=request.data, context={"request": request})
if serializer.is_valid():
user = serializer.save()
# Don't log in the user immediately - they need to verify their email first
response_serializer = SignupOutputSerializer(
{
"access": None,
"refresh": None,
"user": user,
"detail": "Registration successful. Please check your email to verify your account.",
"email_verification_required": True,
}
)
return Response(response_serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@extend_schema_view(
post=extend_schema(
summary="User logout",
description="Logout the current user and blacklist their refresh token.",
responses={
200: LogoutOutputSerializer,
401: "Unauthorized",
},
tags=["Authentication"],
),
)
class LogoutAPIView(APIView):
"""API endpoint for user logout."""
permission_classes = [IsAuthenticated]
serializer_class = LogoutOutputSerializer
def post(self, request: Request) -> Response:
try:
# Get refresh token from request data with proper type handling
refresh_token = None
if hasattr(request, "data") and request.data is not None:
data = getattr(request, "data", {})
if hasattr(data, "get"):
refresh_token = data.get("refresh")
if refresh_token and isinstance(refresh_token, str):
# Blacklist the refresh token
from rest_framework_simplejwt.tokens import RefreshToken
try:
# Create RefreshToken from string and blacklist it
refresh_token_obj = RefreshToken(refresh_token) # type: ignore[arg-type]
refresh_token_obj.blacklist()
except Exception:
# Token might be invalid or already blacklisted
pass
# Also delete the old token for backward compatibility
if hasattr(request.user, "auth_token"):
request.user.auth_token.delete()
# Logout from session using the underlying HttpRequest
logout(_get_underlying_request(request))
response_serializer = LogoutOutputSerializer({"detail": "Logout successful"})
return Response(response_serializer.data)
except Exception:
return Response({"detail": "Logout failed"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@extend_schema_view(
get=extend_schema(
summary="Get current user",
description="Retrieve information about the currently authenticated user.",
responses={
200: UserOutputSerializer,
401: "Unauthorized",
},
tags=["Authentication"],
),
)
class CurrentUserAPIView(APIView):
"""API endpoint to get current user information."""
permission_classes = [IsAuthenticated]
serializer_class = UserOutputSerializer
def get(self, request: Request) -> Response:
serializer = UserOutputSerializer(request.user)
return Response(serializer.data)
@extend_schema_view(
post=extend_schema(
summary="Request password reset",
description="Send a password reset email to the user.",
request=PasswordResetInputSerializer,
responses={
200: PasswordResetOutputSerializer,
400: "Bad Request",
},
tags=["Authentication"],
),
)
class PasswordResetAPIView(APIView):
"""API endpoint to request password reset."""
permission_classes = [AllowAny]
serializer_class = PasswordResetInputSerializer
def post(self, request: Request) -> Response:
serializer = PasswordResetInputSerializer(data=request.data, context={"request": request})
if serializer.is_valid():
serializer.save()
response_serializer = PasswordResetOutputSerializer({"detail": "Password reset email sent"})
return Response(response_serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@extend_schema_view(
post=extend_schema(
summary="Change password",
description="Change the current user's password.",
request=PasswordChangeInputSerializer,
responses={
200: PasswordChangeOutputSerializer,
400: "Bad Request",
401: "Unauthorized",
},
tags=["Authentication"],
),
)
class PasswordChangeAPIView(APIView):
"""API endpoint to change password."""
permission_classes = [IsAuthenticated]
serializer_class = PasswordChangeInputSerializer
def post(self, request: Request) -> Response:
serializer = PasswordChangeInputSerializer(data=request.data, context={"request": request})
if serializer.is_valid():
serializer.save()
response_serializer = PasswordChangeOutputSerializer({"detail": "Password changed successfully"})
return Response(response_serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@extend_schema_view(
get=extend_schema(
summary="Get social providers",
description="Retrieve available social authentication providers.",
responses={200: "List of social providers"},
tags=["Authentication"],
),
)
class SocialProvidersAPIView(APIView):
"""API endpoint to get available social authentication providers."""
permission_classes = [AllowAny]
serializer_class = SocialProviderOutputSerializer
def get(self, request: Request) -> Response:
from django.core.cache import cache
# get_current_site expects a django HttpRequest; _get_underlying_request now returns HttpRequest
site = get_current_site(_get_underlying_request(request))
# Cache key based on site and request host - use getattr to avoid attribute errors
site_id = getattr(site, "id", getattr(site, "pk", None))
cache_key = f"social_providers:{site_id}:{request.get_host()}"
# Try to get from cache first (cache for 15 minutes)
cached_providers = cache.get(cache_key)
if cached_providers is not None:
return Response(cached_providers)
providers_list = []
# Optimized query: filter by site and order by provider name
from allauth.socialaccount.models import SocialApp
social_apps = SocialApp.objects.filter(sites=site).order_by("provider")
for social_app in social_apps:
try:
provider_name = social_app.name or getattr(social_app, "provider", "").title()
auth_url = request.build_absolute_uri(f"/accounts/{social_app.provider}/login/")
providers_list.append(
{
"id": social_app.provider,
"name": provider_name,
"authUrl": auth_url,
}
)
except Exception:
continue
serializer = SocialProviderOutputSerializer(providers_list, many=True)
response_data = serializer.data
cache.set(cache_key, response_data, 900)
return Response(response_data)
@extend_schema_view(
post=extend_schema(
summary="Check authentication status",
description="Check if user is authenticated and return user data.",
responses={200: AuthStatusOutputSerializer},
tags=["Authentication"],
),
)
class AuthStatusAPIView(APIView):
"""API endpoint to check authentication status."""
permission_classes = [AllowAny]
serializer_class = AuthStatusOutputSerializer
def post(self, request: Request) -> Response:
if request.user.is_authenticated:
response_data = {
"authenticated": True,
"user": request.user,
}
else:
response_data = {
"authenticated": False,
"user": None,
}
serializer = AuthStatusOutputSerializer(response_data)
return Response(serializer.data)
# === SOCIAL PROVIDER MANAGEMENT API VIEWS ===
@extend_schema_view(
get=extend_schema(
summary="Get available social providers",
description="Retrieve list of available social authentication providers.",
responses={
200: AvailableProviderSerializer(many=True),
},
tags=["Social Authentication"],
),
)
class AvailableProvidersAPIView(APIView):
"""API endpoint to get available social providers."""
permission_classes = [AllowAny]
serializer_class = AvailableProviderSerializer
def get(self, request: Request) -> Response:
providers = [
{
"provider": "google",
"name": "Google",
"login_url": "/auth/social/google/",
"connect_url": "/auth/social/connect/google/",
},
{
"provider": "discord",
"name": "Discord",
"login_url": "/auth/social/discord/",
"connect_url": "/auth/social/connect/discord/",
},
]
serializer = AvailableProviderSerializer(providers, many=True)
return Response(serializer.data)
@extend_schema_view(
get=extend_schema(
summary="Get connected social providers",
description="Retrieve list of social providers connected to the user's account.",
responses={
200: ConnectedProviderSerializer(many=True),
401: "Unauthorized",
},
tags=["Social Authentication"],
),
)
class ConnectedProvidersAPIView(APIView):
"""API endpoint to get user's connected social providers."""
permission_classes = [IsAuthenticated]
serializer_class = ConnectedProviderSerializer
def get(self, request: Request) -> Response:
service = SocialProviderService()
providers = service.get_connected_providers(request.user)
serializer = ConnectedProviderSerializer(providers, many=True)
return Response(serializer.data)
@extend_schema_view(
post=extend_schema(
summary="Connect social provider",
description="Connect a social authentication provider to the user's account.",
request=ConnectProviderInputSerializer,
responses={
200: ConnectProviderOutputSerializer,
400: SocialProviderErrorSerializer,
401: "Unauthorized",
},
tags=["Social Authentication"],
),
)
class ConnectProviderAPIView(APIView):
"""API endpoint to connect a social provider."""
permission_classes = [IsAuthenticated]
serializer_class = ConnectProviderInputSerializer
def post(self, request: Request, provider: str) -> Response:
# Validate provider
if provider not in ["google", "discord"]:
return Response(
{
"detail": f"Provider '{provider}' is not supported",
"code": "INVALID_PROVIDER",
"suggestions": ["Use 'google' or 'discord'"],
},
status=status.HTTP_400_BAD_REQUEST,
)
serializer = ConnectProviderInputSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{
"detail": "Invalid request data",
"code": "VALIDATION_ERROR",
"details": serializer.errors,
"suggestions": ["Provide a valid access_token"],
},
status=status.HTTP_400_BAD_REQUEST,
)
access_token = serializer.validated_data["access_token"]
try:
service = SocialProviderService()
result = service.connect_provider(request.user, provider, access_token)
response_serializer = ConnectProviderOutputSerializer(result)
return Response(response_serializer.data)
except Exception as e:
return Response(
{
"success": False,
"detail": "CONNECTION_FAILED",
"message": str(e),
"suggestions": [
"Verify the access token is valid",
"Ensure the provider account is not already connected to another user",
],
},
status=status.HTTP_400_BAD_REQUEST,
)
@extend_schema_view(
post=extend_schema(
summary="Disconnect social provider",
description="Disconnect a social authentication provider from the user's account.",
responses={
200: DisconnectProviderOutputSerializer,
400: SocialProviderErrorSerializer,
401: "Unauthorized",
},
tags=["Social Authentication"],
),
)
class DisconnectProviderAPIView(APIView):
"""API endpoint to disconnect a social provider."""
permission_classes = [IsAuthenticated]
serializer_class = DisconnectProviderOutputSerializer
def post(self, request: Request, provider: str) -> Response:
# Validate provider
if provider not in ["google", "discord"]:
return Response(
{
"detail": f"Provider '{provider}' is not supported",
"code": "INVALID_PROVIDER",
"suggestions": ["Use 'google' or 'discord'"],
},
status=status.HTTP_400_BAD_REQUEST,
)
try:
service = SocialProviderService()
# Check if disconnection is safe
can_disconnect, reason = service.can_disconnect_provider(request.user, provider)
if not can_disconnect:
return Response(
{
"success": False,
"detail": "UNSAFE_DISCONNECTION",
"message": reason,
"suggestions": [
"Set up email/password authentication before disconnecting",
"Connect another social provider before disconnecting this one",
],
},
status=status.HTTP_400_BAD_REQUEST,
)
# Perform disconnection
result = service.disconnect_provider(request.user, provider)
response_serializer = DisconnectProviderOutputSerializer(result)
return Response(response_serializer.data)
except Exception as e:
return Response(
{
"success": False,
"detail": "DISCONNECTION_FAILED",
"message": str(e),
"suggestions": [
"Verify the provider is currently connected",
"Ensure you have alternative authentication methods",
],
},
status=status.HTTP_400_BAD_REQUEST,
)
@extend_schema_view(
get=extend_schema(
summary="Get social authentication status",
description="Get comprehensive social authentication status for the user.",
responses={
200: SocialAuthStatusSerializer,
401: "Unauthorized",
},
tags=["Social Authentication"],
),
)
class SocialAuthStatusAPIView(APIView):
"""API endpoint to get social authentication status."""
permission_classes = [IsAuthenticated]
serializer_class = SocialAuthStatusSerializer
def get(self, request: Request) -> Response:
service = SocialProviderService()
auth_status = service.get_auth_status(request.user)
serializer = SocialAuthStatusSerializer(auth_status)
return Response(serializer.data)
# === EMAIL VERIFICATION API VIEWS ===
@extend_schema_view(
get=extend_schema(
summary="Verify email address",
description="Verify user's email address using verification token.",
responses={
200: {"type": "object", "properties": {"message": {"type": "string"}}},
400: "Bad Request",
404: "Token not found",
},
tags=["Authentication"],
),
)
class EmailVerificationAPIView(APIView):
"""API endpoint for email verification."""
permission_classes = [AllowAny]
authentication_classes = []
def get(self, request: Request, token: str) -> Response:
from apps.accounts.models import EmailVerification
try:
verification = EmailVerification.objects.select_related("user").get(token=token)
user = verification.user
# Activate the user
user.is_active = True
user.save()
# Delete the verification record
verification.delete()
return Response({"detail": "Email verified successfully. You can now log in.", "success": True})
except EmailVerification.DoesNotExist:
return Response({"detail": "Invalid or expired verification token"}, status=status.HTTP_404_NOT_FOUND)
@extend_schema_view(
post=extend_schema(
summary="Resend verification email",
description="Resend email verification to user's email address.",
request={"type": "object", "properties": {"email": {"type": "string", "format": "email"}}},
responses={
200: {"type": "object", "properties": {"message": {"type": "string"}}},
400: "Bad Request",
404: "User not found",
},
tags=["Authentication"],
),
)
class ResendVerificationAPIView(APIView):
"""API endpoint to resend email verification."""
permission_classes = [AllowAny]
authentication_classes = []
def post(self, request: Request) -> Response:
from django.contrib.sites.shortcuts import get_current_site
from django.utils.crypto import get_random_string
from django_forwardemail.services import EmailService
from apps.accounts.models import EmailVerification
email = request.data.get("email")
if not email:
return Response({"detail": "Email address is required"}, status=status.HTTP_400_BAD_REQUEST)
try:
user = UserModel.objects.get(email__iexact=email.strip().lower())
# Don't resend if user is already active
if user.is_active:
return Response({"detail": "Email is already verified"}, status=status.HTTP_400_BAD_REQUEST)
# Create or update verification record
verification, created = EmailVerification.objects.get_or_create(
user=user, defaults={"token": get_random_string(64)}
)
if not created:
# Update existing token and timestamp
verification.token = get_random_string(64)
verification.save()
# Send verification email
site = get_current_site(_get_underlying_request(request))
verification_url = request.build_absolute_uri(f"/api/v1/auth/verify-email/{verification.token}/")
try:
EmailService.send_email(
to=user.email,
subject="Verify your ThrillWiki account",
text=f"""
Welcome to ThrillWiki!
Please verify your email address by clicking the link below:
{verification_url}
If you didn't create an account, you can safely ignore this email.
Thanks,
The ThrillWiki Team
""".strip(),
site=site,
)
return Response({"detail": "Verification email sent successfully", "success": True})
except Exception as e:
capture_and_log(e, 'Send verification email', source='api')
return Response(
{"detail": "Failed to send verification email"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
except UserModel.DoesNotExist:
# Don't reveal whether email exists
return Response({"detail": "If the email exists, a verification email has been sent", "success": True})
# Note: User Profile, Top List, and Top List Item ViewSets are now handled
# by the dedicated accounts app at backend/apps/api/v1/accounts/views.py
# to avoid duplication and maintain clean separation of concerns.
@extend_schema_view(
post=extend_schema(
summary="Process OAuth profile",
description="Process OAuth profile data during social authentication flow.",
request={
"type": "object",
"properties": {
"provider": {"type": "string", "description": "OAuth provider (e.g., google, discord)"},
"profile": {
"type": "object",
"description": "Profile data from OAuth provider",
"properties": {
"id": {"type": "string"},
"email": {"type": "string", "format": "email"},
"name": {"type": "string"},
"avatar_url": {"type": "string", "format": "uri"},
},
},
"access_token": {"type": "string", "description": "OAuth access token"},
},
"required": ["provider", "profile"],
},
responses={
200: {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"action": {"type": "string", "enum": ["created", "updated", "linked"]},
"user": {"type": "object"},
"profile_synced": {"type": "boolean"},
},
},
400: "Bad Request",
401: "Unauthorized",
403: "Account suspended",
},
tags=["Social Authentication"],
),
)
class ProcessOAuthProfileAPIView(APIView):
"""
API endpoint to process OAuth profile data.
This endpoint is called AFTER the OAuth flow is complete to:
1. Check if user is banned (SECURITY CRITICAL)
2. Extract avatar from OAuth provider
3. Download and upload avatar to Cloudflare Images
4. Sync display name from OAuth provider
5. Update username if it's a generic UUID-based username
Called with an empty body - uses the authenticated session.
Full parity with Supabase Edge Function: process-oauth-profile
BULLETPROOFED: Comprehensive validation, sanitization, and error handling.
"""
permission_classes = [IsAuthenticated]
# Security constants
MAX_AVATAR_SIZE = 10 * 1024 * 1024 # 10MB
AVATAR_DOWNLOAD_TIMEOUT = 10.0 # seconds
AVATAR_UPLOAD_TIMEOUT = 30.0 # seconds
MAX_USERNAME_LENGTH = 150
MIN_USERNAME_LENGTH = 3
ALLOWED_USERNAME_CHARS = set("abcdefghijklmnopqrstuvwxyz0123456789_")
# Rate limiting for avatar uploads (prevent abuse)
AVATAR_UPLOAD_COOLDOWN = 60 # seconds between uploads
def post(self, request: Request) -> Response:
import re
import httpx
from django.db import transaction
from django.core.cache import cache
try:
user = request.user
# ================================================================
# STEP 0: Validate user object exists and is valid
# ================================================================
if not user or not hasattr(user, 'user_id'):
logger.error("ProcessOAuthProfile called with invalid user object")
return Response({
"success": False,
"error": "Invalid user session",
}, status=status.HTTP_401_UNAUTHORIZED)
user_id_str = str(user.user_id)
# ================================================================
# STEP 1: CRITICAL - Check ban status FIRST
# ================================================================
is_banned = getattr(user, 'is_banned', False)
# Also check via profile if applicable
if not is_banned:
try:
from apps.accounts.models import UserProfile
profile_check = UserProfile.objects.filter(user=user).first()
if profile_check and getattr(profile_check, 'is_banned', False):
is_banned = True
except Exception:
pass
if is_banned:
ban_reason = getattr(user, 'ban_reason', None) or "Policy violation"
# Sanitize ban reason for response
safe_ban_reason = str(ban_reason)[:200] if ban_reason else None
logger.warning(
f"Banned user attempted OAuth profile update",
extra={"user_id": user_id_str, "ban_reason": safe_ban_reason}
)
return Response({
"error": "Account suspended",
"message": (
f"Your account has been suspended. Reason: {safe_ban_reason}"
if safe_ban_reason
else "Your account has been suspended. Contact support for assistance."
),
"ban_reason": safe_ban_reason,
}, status=status.HTTP_403_FORBIDDEN)
# ================================================================
# STEP 2: Check rate limiting for avatar uploads
# ================================================================
rate_limit_key = f"oauth_profile:avatar:{user_id_str}"
if cache.get(rate_limit_key):
return Response({
"success": True,
"action": "rate_limited",
"message": "Please wait before updating your profile again",
"avatar_uploaded": False,
"profile_updated": False,
})
# ================================================================
# STEP 3: Get OAuth provider info from social accounts
# ================================================================
try:
from allauth.socialaccount.models import SocialAccount
except ImportError:
logger.error("django-allauth not installed")
return Response({
"success": False,
"error": "Social authentication not configured",
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
social_accounts = SocialAccount.objects.filter(user=user)
if not social_accounts.exists():
return Response({
"success": True,
"action": "skipped",
"message": "No OAuth accounts linked",
})
# Get the most recent social account
social_account = social_accounts.order_by("-date_joined").first()
if not social_account:
return Response({
"success": True,
"action": "skipped",
"message": "No valid OAuth account found",
})
provider = social_account.provider or "unknown"
extra_data = social_account.extra_data or {}
# Validate extra_data is a dict
if not isinstance(extra_data, dict):
logger.warning(f"Invalid extra_data type for user {user_id_str}: {type(extra_data)}")
extra_data = {}
# ================================================================
# STEP 4: Extract profile data based on provider (with sanitization)
# ================================================================
avatar_url = None
display_name = None
username_base = None
if provider == "google":
avatar_url = self._sanitize_url(extra_data.get("picture"))
display_name = self._sanitize_display_name(extra_data.get("name"))
email = extra_data.get("email", "")
if email and isinstance(email, str):
username_base = self._sanitize_username(email.split("@")[0])
elif provider == "discord":
discord_data = extra_data
discord_id = discord_data.get("id") or discord_data.get("sub")
display_name = self._sanitize_display_name(
discord_data.get("global_name")
or discord_data.get("full_name")
or discord_data.get("name")
)
# Discord avatar URL construction with validation
avatar_hash = discord_data.get("avatar")
if discord_id and avatar_hash and isinstance(discord_id, str) and isinstance(avatar_hash, str):
# Validate discord_id is numeric
if discord_id.isdigit():
# Validate avatar_hash is alphanumeric
if re.match(r'^[a-zA-Z0-9_]+$', avatar_hash):
avatar_url = f"https://cdn.discordapp.com/avatars/{discord_id}/{avatar_hash}.png?size=256"
if not avatar_url:
avatar_url = self._sanitize_url(
discord_data.get("avatar_url") or discord_data.get("picture")
)
raw_username = discord_data.get("username") or discord_data.get("name", "")
if raw_username and isinstance(raw_username, str):
username_base = self._sanitize_username(raw_username.split("#")[0])
if not username_base and discord_id:
username_base = f"discord_{str(discord_id)[:8]}"
else:
# Generic provider handling
avatar_url = self._sanitize_url(
extra_data.get("picture")
or extra_data.get("avatar_url")
or extra_data.get("avatar")
)
display_name = self._sanitize_display_name(
extra_data.get("name") or extra_data.get("display_name")
)
# ================================================================
# STEP 5: Get or create user profile (with transaction)
# ================================================================
from apps.accounts.models import UserProfile
with transaction.atomic():
profile, profile_created = UserProfile.objects.select_for_update().get_or_create(
user=user
)
# Check if profile already has an avatar
if profile.avatar_id:
return Response({
"success": True,
"action": "skipped",
"message": "Avatar already exists",
"avatar_uploaded": False,
"profile_updated": False,
})
# ================================================================
# STEP 6: Download and upload avatar to Cloudflare (outside transaction)
# ================================================================
avatar_uploaded = False
if avatar_url:
try:
# Validate URL scheme
if not avatar_url.startswith(('https://', 'http://')):
logger.warning(f"Invalid avatar URL scheme: {avatar_url[:50]}")
else:
# Download avatar from provider
download_response = httpx.get(
avatar_url,
timeout=self.AVATAR_DOWNLOAD_TIMEOUT,
follow_redirects=True,
headers={
"User-Agent": "ThrillWiki/1.0",
"Accept": "image/*",
},
)
if download_response.status_code == 200:
image_data = download_response.content
content_type = download_response.headers.get("content-type", "")
# Validate content type
if not content_type.startswith("image/"):
logger.warning(f"Invalid content type for avatar: {content_type}")
# Validate file size
elif len(image_data) > self.MAX_AVATAR_SIZE:
logger.warning(
f"Avatar too large for user {user_id_str}: {len(image_data)} bytes"
)
# Validate minimum size (avoid empty images)
elif len(image_data) < 100:
logger.warning(f"Avatar too small for user {user_id_str}")
else:
avatar_uploaded = self._upload_to_cloudflare(
image_data, user_id_str, provider, profile
)
else:
logger.warning(
f"Avatar download failed: {download_response.status_code}",
extra={"user_id": user_id_str, "provider": provider}
)
except httpx.TimeoutException:
logger.warning(f"Avatar download timeout for user {user_id_str}")
except httpx.HTTPError as download_error:
logger.warning(f"Failed to download avatar: {download_error}")
except Exception as e:
logger.warning(f"Unexpected avatar error: {e}")
# Set rate limit after successful processing
if avatar_uploaded:
cache.set(rate_limit_key, True, self.AVATAR_UPLOAD_COOLDOWN)
# ================================================================
# STEP 7: Update display name if not set (with validation)
# ================================================================
profile_updated = False
if display_name and not getattr(user, "display_name", None):
try:
user.display_name = display_name
user.save(update_fields=["display_name"])
profile_updated = True
except Exception as e:
logger.warning(f"Failed to update display name: {e}")
# ================================================================
# STEP 8: Update username if it's a generic UUID-based username
# ================================================================
current_username = getattr(user, "username", "") or ""
if username_base and current_username.startswith("user_"):
try:
new_username = self._ensure_unique_username(username_base, user.user_id)
if new_username and new_username != current_username:
user.username = new_username
user.save(update_fields=["username"])
profile_updated = True
logger.info(
f"Username updated from {current_username} to {new_username}",
extra={"user_id": user_id_str}
)
except Exception as e:
logger.warning(f"Failed to update username: {e}")
return Response({
"success": True,
"action": "processed",
"provider": provider,
"avatar_uploaded": avatar_uploaded,
"profile_updated": profile_updated,
"message": "OAuth profile processed successfully",
})
except Exception as e:
capture_and_log(e, "Process OAuth profile", source="api", request=request)
return Response({
"success": False,
"error": "Failed to process OAuth profile",
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def _sanitize_url(self, url) -> str | None:
"""Sanitize and validate URL."""
if not url or not isinstance(url, str):
return None
url = url.strip()[:2000] # Limit length
# Basic URL validation
if not url.startswith(('https://', 'http://')):
return None
# Block obviously malicious patterns
dangerous_patterns = ['javascript:', 'data:', 'file:', '<script', 'onclick']
for pattern in dangerous_patterns:
if pattern.lower() in url.lower():
return None
return url
def _sanitize_display_name(self, name) -> str | None:
"""Sanitize display name."""
if not name or not isinstance(name, str):
return None
import re
# Strip and limit length
name = name.strip()[:100]
# Remove control characters
name = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', name)
# Remove excessive whitespace
name = ' '.join(name.split())
# Must have at least 1 character
if len(name) < 1:
return None
return name
def _sanitize_username(self, username) -> str | None:
"""Sanitize username for use."""
if not username or not isinstance(username, str):
return None
import re
# Lowercase and remove non-allowed characters
username = username.lower().strip()
username = re.sub(r'[^a-z0-9_]', '', username)
# Enforce length limits
if len(username) < self.MIN_USERNAME_LENGTH:
return None
username = username[:self.MAX_USERNAME_LENGTH]
return username
def _upload_to_cloudflare(self, image_data: bytes, user_id: str, provider: str, profile) -> bool:
"""Upload image to Cloudflare Images with error handling."""
import httpx
from django.db import transaction
try:
from django_cloudflareimages_toolkit.models import CloudflareImage
from django_cloudflareimages_toolkit.services import CloudflareImagesService
cf_service = CloudflareImagesService()
# Request direct upload URL
upload_result = cf_service.get_direct_upload_url(
metadata={
"type": "avatar",
"user_id": user_id,
"provider": provider,
}
)
if not upload_result or "upload_url" not in upload_result:
logger.warning("Failed to get Cloudflare upload URL")
return False
upload_url = upload_result["upload_url"]
cloudflare_id = upload_result.get("id") or upload_result.get("cloudflare_id")
if not cloudflare_id:
logger.warning("No Cloudflare ID in upload result")
return False
# Upload image to Cloudflare
files = {"file": ("avatar.png", image_data, "image/png")}
upload_response = httpx.post(
upload_url,
files=files,
timeout=self.AVATAR_UPLOAD_TIMEOUT,
)
if upload_response.status_code not in [200, 201]:
logger.warning(f"Cloudflare upload failed: {upload_response.status_code}")
return False
# Create CloudflareImage record and link to profile
with transaction.atomic():
cf_image = CloudflareImage.objects.create(
cloudflare_id=cloudflare_id,
is_uploaded=True,
metadata={
"type": "avatar",
"user_id": user_id,
"provider": provider,
}
)
profile.avatar = cf_image
profile.save(update_fields=["avatar"])
logger.info(
f"Avatar uploaded successfully",
extra={"user_id": user_id, "provider": provider, "cloudflare_id": cloudflare_id}
)
return True
except ImportError:
logger.warning("django-cloudflareimages-toolkit not available")
return False
except Exception as cf_error:
logger.warning(f"Cloudflare upload error: {cf_error}")
return False
def _ensure_unique_username(self, base_username: str, user_id: str, max_attempts: int = 10) -> str | None:
"""
Ensure username is unique by appending numbers if needed.
Returns None if no valid username can be generated.
"""
if not base_username:
return None
username = base_username.lower()[:self.MAX_USERNAME_LENGTH]
# Validate characters
if not all(c in self.ALLOWED_USERNAME_CHARS for c in username):
return None
attempt = 0
while attempt < max_attempts:
try:
existing = UserModel.objects.filter(username=username).exclude(user_id=user_id).exists()
if not existing:
return username
except Exception:
break
attempt += 1
# Ensure we don't exceed max length with suffix
suffix = f"_{attempt}"
max_base = self.MAX_USERNAME_LENGTH - len(suffix)
username = f"{base_username.lower()[:max_base]}{suffix}"
# Fallback to UUID-based username
return f"user_{str(user_id)[:8]}"