mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2026-02-05 09:25:18 -05:00
1598 lines
60 KiB
Python
1598 lines
60 KiB
Python
"""
|
|
Auth domain views for ThrillWiki API v1.
|
|
|
|
This module contains all authentication-related API endpoints including
|
|
login, signup, logout, password management, social authentication,
|
|
user profiles, and top lists.
|
|
"""
|
|
|
|
import logging
|
|
|
|
from typing import cast # added 'cast'
|
|
|
|
from django.contrib.auth import authenticate, get_user_model, login, logout
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from django.core.exceptions import ValidationError
|
|
from django.db.models import Q
|
|
from django.http import HttpRequest # new import
|
|
from drf_spectacular.utils import extend_schema, extend_schema_view
|
|
from rest_framework import status
|
|
from rest_framework.permissions import AllowAny, IsAuthenticated
|
|
from rest_framework.request import Request
|
|
from rest_framework.response import Response
|
|
from rest_framework.views import APIView
|
|
|
|
from apps.accounts.services.social_provider_service import SocialProviderService
|
|
from apps.core.utils import capture_and_log
|
|
|
|
# Import directly from the auth serializers.py file (not the serializers package)
|
|
from .serializers import (
|
|
AuthStatusOutputSerializer,
|
|
# Authentication serializers
|
|
LoginInputSerializer,
|
|
LoginOutputSerializer,
|
|
LogoutOutputSerializer,
|
|
PasswordChangeInputSerializer,
|
|
PasswordChangeOutputSerializer,
|
|
PasswordResetInputSerializer,
|
|
PasswordResetOutputSerializer,
|
|
SignupInputSerializer,
|
|
SignupOutputSerializer,
|
|
SocialProviderOutputSerializer,
|
|
UserOutputSerializer,
|
|
)
|
|
from .serializers_package.social import (
|
|
AvailableProviderSerializer,
|
|
ConnectedProviderSerializer,
|
|
ConnectProviderInputSerializer,
|
|
ConnectProviderOutputSerializer,
|
|
DisconnectProviderOutputSerializer,
|
|
SocialAuthStatusSerializer,
|
|
SocialProviderErrorSerializer,
|
|
)
|
|
|
|
# Handle optional dependencies with fallback classes
|
|
|
|
|
|
class FallbackTurnstileMixin:
|
|
"""Fallback mixin if TurnstileMixin is not available."""
|
|
|
|
def validate_turnstile(self, request):
|
|
pass
|
|
|
|
|
|
# Try to import the real class, use fallback if not available and ensure it's a class/type
|
|
try:
|
|
from apps.accounts.mixins import TurnstileMixin as _ImportedTurnstileMixin
|
|
|
|
# Ensure the imported object is a class/type that can be used as a base class.
|
|
# If it's not a type for any reason, fall back to the safe mixin.
|
|
TurnstileMixin = _ImportedTurnstileMixin if isinstance(_ImportedTurnstileMixin, type) else FallbackTurnstileMixin
|
|
except Exception:
|
|
# Catch any import errors or unexpected exceptions and use the fallback mixin.
|
|
TurnstileMixin = FallbackTurnstileMixin
|
|
|
|
UserModel = get_user_model()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Helper: safely obtain underlying HttpRequest (used by Django auth)
|
|
|
|
|
|
def _get_underlying_request(request: Request) -> HttpRequest:
|
|
"""
|
|
Return a django HttpRequest for use with Django auth and site utilities.
|
|
|
|
DRF's Request wraps the underlying HttpRequest in ._request; cast() tells the
|
|
typechecker that the returned object is indeed an HttpRequest.
|
|
"""
|
|
return cast(HttpRequest, getattr(request, "_request", request))
|
|
|
|
|
|
# Helper: encapsulate user lookup + authenticate to reduce complexity in view
|
|
def _authenticate_user_by_lookup(email_or_username: str, password: str, request: Request) -> UserModel | None:
|
|
"""
|
|
Try a single optimized query to find a user by email OR username then authenticate.
|
|
Returns authenticated user or None.
|
|
"""
|
|
try:
|
|
# Single query to find user by email OR username
|
|
if "@" in (email_or_username or ""):
|
|
user_obj = (
|
|
UserModel.objects.select_related()
|
|
.filter(Q(email=email_or_username) | Q(username=email_or_username))
|
|
.first()
|
|
)
|
|
else:
|
|
user_obj = (
|
|
UserModel.objects.select_related()
|
|
.filter(Q(username=email_or_username) | Q(email=email_or_username))
|
|
.first()
|
|
)
|
|
|
|
if user_obj:
|
|
username_val = getattr(user_obj, "username", None)
|
|
return authenticate(
|
|
# type: ignore[arg-type]
|
|
_get_underlying_request(request),
|
|
username=username_val,
|
|
password=password,
|
|
)
|
|
except Exception:
|
|
# Fallback to authenticate directly with provided identifier
|
|
return authenticate(
|
|
# type: ignore[arg-type]
|
|
_get_underlying_request(request),
|
|
username=email_or_username,
|
|
password=password,
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
# === AUTHENTICATION API VIEWS ===
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="User login",
|
|
description="Authenticate user with username/email and password.",
|
|
request=LoginInputSerializer,
|
|
responses={
|
|
200: LoginOutputSerializer,
|
|
400: "Bad Request",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class LoginAPIView(APIView):
|
|
"""API endpoint for user login."""
|
|
|
|
permission_classes = [AllowAny]
|
|
authentication_classes = []
|
|
serializer_class = LoginInputSerializer
|
|
|
|
def post(self, request: Request) -> Response:
|
|
try:
|
|
# instantiate mixin before calling to avoid type-mismatch in static analysis
|
|
TurnstileMixin().validate_turnstile(request)
|
|
except ValidationError as e:
|
|
return Response({"detail": str(e)}, status=status.HTTP_400_BAD_REQUEST)
|
|
except Exception:
|
|
# If mixin doesn't do anything, continue
|
|
pass
|
|
|
|
serializer = LoginInputSerializer(data=request.data)
|
|
if serializer.is_valid():
|
|
validated = serializer.validated_data
|
|
# Use .get to satisfy static analyzers
|
|
email_or_username = validated.get("username") # type: ignore[assignment]
|
|
password = validated.get("password") # type: ignore[assignment]
|
|
|
|
if not email_or_username or not password:
|
|
return Response(
|
|
{"detail": "username and password are required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
user = _authenticate_user_by_lookup(email_or_username, password, request)
|
|
|
|
if user:
|
|
if getattr(user, "is_active", False):
|
|
# Check if user has MFA enabled
|
|
mfa_info = self._check_user_mfa(user)
|
|
|
|
if mfa_info["has_mfa"]:
|
|
# MFA required - generate temp token and return mfa_required response
|
|
from django.utils.crypto import get_random_string
|
|
from django.core.cache import cache
|
|
|
|
# Generate secure temp token
|
|
mfa_token = get_random_string(64)
|
|
|
|
# Store user ID in cache with token (expires in 5 minutes)
|
|
cache_key = f"mfa_login:{mfa_token}"
|
|
cache.set(cache_key, {
|
|
"user_id": user.pk,
|
|
"username": user.username,
|
|
}, timeout=300) # 5 minutes
|
|
|
|
from .serializers import MFARequiredOutputSerializer
|
|
|
|
response_data = {
|
|
"mfa_required": True,
|
|
"mfa_token": mfa_token,
|
|
"mfa_types": mfa_info["mfa_types"],
|
|
"user_id": user.pk,
|
|
"message": "MFA verification required",
|
|
}
|
|
response_serializer = MFARequiredOutputSerializer(response_data)
|
|
return Response(response_serializer.data)
|
|
|
|
# No MFA - proceed with normal login
|
|
# pass a real HttpRequest to Django login with backend specified
|
|
login(_get_underlying_request(request), user, backend="django.contrib.auth.backends.ModelBackend")
|
|
|
|
# Generate JWT tokens
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
|
|
refresh = RefreshToken.for_user(user)
|
|
access_token = refresh.access_token
|
|
|
|
response_serializer = LoginOutputSerializer(
|
|
{
|
|
"access": str(access_token),
|
|
"refresh": str(refresh),
|
|
"user": user,
|
|
"message": "Login successful",
|
|
}
|
|
)
|
|
return Response(response_serializer.data)
|
|
else:
|
|
return Response(
|
|
{
|
|
"detail": "Please verify your email address before logging in. Check your email for a verification link.",
|
|
"code": "EMAIL_VERIFICATION_REQUIRED",
|
|
"email_verification_required": True,
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
else:
|
|
return Response(
|
|
{"detail": "Invalid credentials"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
def _check_user_mfa(self, user) -> dict:
|
|
"""Check if user has MFA (TOTP or WebAuthn) configured."""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
|
|
authenticators = Authenticator.objects.filter(user=user)
|
|
|
|
has_totp = authenticators.filter(type=Authenticator.Type.TOTP).exists()
|
|
has_webauthn = authenticators.filter(type=Authenticator.Type.WEBAUTHN).exists()
|
|
|
|
mfa_types = []
|
|
if has_totp:
|
|
mfa_types.append("totp")
|
|
if has_webauthn:
|
|
mfa_types.append("webauthn")
|
|
|
|
return {
|
|
"has_mfa": has_totp or has_webauthn,
|
|
"has_totp": has_totp,
|
|
"has_webauthn": has_webauthn,
|
|
"mfa_types": mfa_types,
|
|
}
|
|
except ImportError:
|
|
return {"has_mfa": False, "has_totp": False, "has_webauthn": False, "mfa_types": []}
|
|
except Exception:
|
|
return {"has_mfa": False, "has_totp": False, "has_webauthn": False, "mfa_types": []}
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Verify MFA for login",
|
|
description="Complete MFA verification after password authentication. Submit TOTP code to receive JWT tokens.",
|
|
request={"application/json": {
|
|
"type": "object",
|
|
"properties": {
|
|
"mfa_token": {"type": "string", "description": "Temporary token from login response"},
|
|
"code": {"type": "string", "description": "6-digit TOTP code"},
|
|
},
|
|
"required": ["mfa_token", "code"],
|
|
}},
|
|
responses={
|
|
200: LoginOutputSerializer,
|
|
400: "Bad Request - Invalid code or expired token",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class MFALoginVerifyAPIView(APIView):
|
|
"""API endpoint to verify MFA code and complete login."""
|
|
|
|
permission_classes = [AllowAny]
|
|
authentication_classes = []
|
|
|
|
def post(self, request: Request) -> Response:
|
|
from django.core.cache import cache
|
|
from .serializers import MFALoginVerifyInputSerializer
|
|
|
|
serializer = MFALoginVerifyInputSerializer(data=request.data)
|
|
if not serializer.is_valid():
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
validated = serializer.validated_data
|
|
mfa_token = validated.get("mfa_token")
|
|
totp_code = validated.get("code")
|
|
credential = validated.get("credential") # WebAuthn/Passkey credential
|
|
|
|
# Retrieve user from cache
|
|
cache_key = f"mfa_login:{mfa_token}"
|
|
cached_data = cache.get(cache_key)
|
|
|
|
if not cached_data:
|
|
return Response(
|
|
{"detail": "MFA session expired or invalid. Please login again."},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
user_id = cached_data.get("user_id")
|
|
|
|
try:
|
|
user = UserModel.objects.get(pk=user_id)
|
|
except UserModel.DoesNotExist:
|
|
return Response(
|
|
{"detail": "User not found"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Verify MFA - either TOTP or Passkey
|
|
if totp_code:
|
|
if not self._verify_totp(user, totp_code):
|
|
return Response(
|
|
{"detail": "Invalid verification code"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
elif credential:
|
|
# Verify passkey/WebAuthn credential
|
|
passkey_result = self._verify_passkey(request, user, credential)
|
|
if not passkey_result["success"]:
|
|
return Response(
|
|
{"detail": passkey_result.get("error", "Passkey verification failed")},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
else:
|
|
return Response(
|
|
{"detail": "Either TOTP code or passkey credential is required"},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Clear the MFA token from cache
|
|
cache.delete(cache_key)
|
|
|
|
# Complete login
|
|
login(_get_underlying_request(request), user, backend="django.contrib.auth.backends.ModelBackend")
|
|
|
|
# Generate JWT tokens
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
|
|
refresh = RefreshToken.for_user(user)
|
|
access_token = refresh.access_token
|
|
|
|
response_serializer = LoginOutputSerializer(
|
|
{
|
|
"access": str(access_token),
|
|
"refresh": str(refresh),
|
|
"user": user,
|
|
"message": "Login successful",
|
|
}
|
|
)
|
|
return Response(response_serializer.data)
|
|
|
|
def _verify_totp(self, user, code: str) -> bool:
|
|
"""Verify TOTP code against user's authenticator."""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
from allauth.mfa.totp.internal import auth as totp_auth
|
|
|
|
try:
|
|
authenticator = Authenticator.objects.get(
|
|
user=user,
|
|
type=Authenticator.Type.TOTP,
|
|
)
|
|
except Authenticator.DoesNotExist:
|
|
return False
|
|
|
|
# Get the secret from authenticator data and verify
|
|
secret = authenticator.data.get("secret")
|
|
if not secret:
|
|
return False
|
|
|
|
return totp_auth.validate_totp_code(secret, code)
|
|
|
|
except ImportError:
|
|
logger.error("allauth.mfa not available for TOTP verification")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"TOTP verification error: {e}")
|
|
return False
|
|
|
|
def _verify_passkey(self, request, user, credential: dict) -> dict:
|
|
"""Verify WebAuthn/Passkey credential."""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
from allauth.mfa.webauthn.internal import auth as webauthn_auth
|
|
|
|
# Check if user has any WebAuthn authenticators
|
|
has_passkey = Authenticator.objects.filter(
|
|
user=user,
|
|
type=Authenticator.Type.WEBAUTHN,
|
|
).exists()
|
|
|
|
if not has_passkey:
|
|
return {"success": False, "error": "No passkey registered for this user"}
|
|
|
|
try:
|
|
# Parse the authentication response
|
|
credential_data = webauthn_auth.parse_authentication_response(credential)
|
|
|
|
# Get or create authentication state
|
|
# For login flow, we need to set up the state first
|
|
state = webauthn_auth.get_state(request)
|
|
|
|
if not state:
|
|
# If no state, generate one for this user
|
|
_, state = webauthn_auth.begin_authentication(request)
|
|
webauthn_auth.set_state(request, state)
|
|
|
|
# Complete authentication
|
|
webauthn_auth.complete_authentication(request, credential_data, state)
|
|
|
|
# Clear the state
|
|
webauthn_auth.clear_state(request)
|
|
|
|
return {"success": True}
|
|
|
|
except Exception as e:
|
|
logger.error(f"WebAuthn authentication failed: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
except ImportError as e:
|
|
logger.error(f"WebAuthn module not available: {e}")
|
|
return {"success": False, "error": "Passkey authentication not available"}
|
|
except Exception as e:
|
|
logger.error(f"Passkey verification error: {e}")
|
|
return {"success": False, "error": "Passkey verification failed"}
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="User registration",
|
|
description="Register a new user account. Email verification required.",
|
|
request=SignupInputSerializer,
|
|
responses={
|
|
201: SignupOutputSerializer,
|
|
400: "Bad Request",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class SignupAPIView(APIView):
|
|
"""API endpoint for user registration."""
|
|
|
|
permission_classes = [AllowAny]
|
|
authentication_classes = []
|
|
serializer_class = SignupInputSerializer
|
|
|
|
def post(self, request: Request) -> Response:
|
|
try:
|
|
# instantiate mixin before calling to avoid type-mismatch in static analysis
|
|
TurnstileMixin().validate_turnstile(request)
|
|
except ValidationError as e:
|
|
return Response({"detail": str(e)}, status=status.HTTP_400_BAD_REQUEST)
|
|
except Exception:
|
|
# If mixin doesn't do anything, continue
|
|
pass
|
|
|
|
serializer = SignupInputSerializer(data=request.data, context={"request": request})
|
|
if serializer.is_valid():
|
|
user = serializer.save()
|
|
|
|
# Don't log in the user immediately - they need to verify their email first
|
|
response_serializer = SignupOutputSerializer(
|
|
{
|
|
"access": None,
|
|
"refresh": None,
|
|
"user": user,
|
|
"detail": "Registration successful. Please check your email to verify your account.",
|
|
"email_verification_required": True,
|
|
}
|
|
)
|
|
return Response(response_serializer.data, status=status.HTTP_201_CREATED)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="User logout",
|
|
description="Logout the current user and blacklist their refresh token.",
|
|
responses={
|
|
200: LogoutOutputSerializer,
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class LogoutAPIView(APIView):
|
|
"""API endpoint for user logout."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = LogoutOutputSerializer
|
|
|
|
def post(self, request: Request) -> Response:
|
|
try:
|
|
# Get refresh token from request data with proper type handling
|
|
refresh_token = None
|
|
if hasattr(request, "data") and request.data is not None:
|
|
data = getattr(request, "data", {})
|
|
if hasattr(data, "get"):
|
|
refresh_token = data.get("refresh")
|
|
|
|
if refresh_token and isinstance(refresh_token, str):
|
|
# Blacklist the refresh token
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
|
|
try:
|
|
# Create RefreshToken from string and blacklist it
|
|
refresh_token_obj = RefreshToken(refresh_token) # type: ignore[arg-type]
|
|
refresh_token_obj.blacklist()
|
|
except Exception:
|
|
# Token might be invalid or already blacklisted
|
|
pass
|
|
|
|
# Also delete the old token for backward compatibility
|
|
if hasattr(request.user, "auth_token"):
|
|
request.user.auth_token.delete()
|
|
|
|
# Logout from session using the underlying HttpRequest
|
|
logout(_get_underlying_request(request))
|
|
|
|
response_serializer = LogoutOutputSerializer({"detail": "Logout successful"})
|
|
return Response(response_serializer.data)
|
|
except Exception:
|
|
return Response({"detail": "Logout failed"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
@extend_schema_view(
|
|
get=extend_schema(
|
|
summary="Get current user",
|
|
description="Retrieve information about the currently authenticated user.",
|
|
responses={
|
|
200: UserOutputSerializer,
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class CurrentUserAPIView(APIView):
|
|
"""API endpoint to get current user information."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = UserOutputSerializer
|
|
|
|
def get(self, request: Request) -> Response:
|
|
serializer = UserOutputSerializer(request.user)
|
|
return Response(serializer.data)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Request password reset",
|
|
description="Send a password reset email to the user.",
|
|
request=PasswordResetInputSerializer,
|
|
responses={
|
|
200: PasswordResetOutputSerializer,
|
|
400: "Bad Request",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class PasswordResetAPIView(APIView):
|
|
"""API endpoint to request password reset."""
|
|
|
|
permission_classes = [AllowAny]
|
|
serializer_class = PasswordResetInputSerializer
|
|
|
|
def post(self, request: Request) -> Response:
|
|
serializer = PasswordResetInputSerializer(data=request.data, context={"request": request})
|
|
if serializer.is_valid():
|
|
serializer.save()
|
|
|
|
response_serializer = PasswordResetOutputSerializer({"detail": "Password reset email sent"})
|
|
return Response(response_serializer.data)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Change password",
|
|
description="Change the current user's password.",
|
|
request=PasswordChangeInputSerializer,
|
|
responses={
|
|
200: PasswordChangeOutputSerializer,
|
|
400: "Bad Request",
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class PasswordChangeAPIView(APIView):
|
|
"""API endpoint to change password."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = PasswordChangeInputSerializer
|
|
|
|
def post(self, request: Request) -> Response:
|
|
serializer = PasswordChangeInputSerializer(data=request.data, context={"request": request})
|
|
if serializer.is_valid():
|
|
serializer.save()
|
|
|
|
response_serializer = PasswordChangeOutputSerializer({"detail": "Password changed successfully"})
|
|
return Response(response_serializer.data)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@extend_schema_view(
|
|
get=extend_schema(
|
|
summary="Get social providers",
|
|
description="Retrieve available social authentication providers.",
|
|
responses={200: "List of social providers"},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class SocialProvidersAPIView(APIView):
|
|
"""API endpoint to get available social authentication providers."""
|
|
|
|
permission_classes = [AllowAny]
|
|
serializer_class = SocialProviderOutputSerializer
|
|
|
|
def get(self, request: Request) -> Response:
|
|
from django.core.cache import cache
|
|
|
|
# get_current_site expects a django HttpRequest; _get_underlying_request now returns HttpRequest
|
|
site = get_current_site(_get_underlying_request(request))
|
|
|
|
# Cache key based on site and request host - use getattr to avoid attribute errors
|
|
site_id = getattr(site, "id", getattr(site, "pk", None))
|
|
cache_key = f"social_providers:{site_id}:{request.get_host()}"
|
|
|
|
# Try to get from cache first (cache for 15 minutes)
|
|
cached_providers = cache.get(cache_key)
|
|
if cached_providers is not None:
|
|
return Response(cached_providers)
|
|
|
|
providers_list = []
|
|
|
|
# Optimized query: filter by site and order by provider name
|
|
from allauth.socialaccount.models import SocialApp
|
|
|
|
social_apps = SocialApp.objects.filter(sites=site).order_by("provider")
|
|
|
|
for social_app in social_apps:
|
|
try:
|
|
provider_name = social_app.name or getattr(social_app, "provider", "").title()
|
|
|
|
auth_url = request.build_absolute_uri(f"/accounts/{social_app.provider}/login/")
|
|
|
|
providers_list.append(
|
|
{
|
|
"id": social_app.provider,
|
|
"name": provider_name,
|
|
"authUrl": auth_url,
|
|
}
|
|
)
|
|
|
|
except Exception:
|
|
continue
|
|
|
|
serializer = SocialProviderOutputSerializer(providers_list, many=True)
|
|
response_data = serializer.data
|
|
|
|
cache.set(cache_key, response_data, 900)
|
|
|
|
return Response(response_data)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Check authentication status",
|
|
description="Check if user is authenticated and return user data.",
|
|
responses={200: AuthStatusOutputSerializer},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class AuthStatusAPIView(APIView):
|
|
"""API endpoint to check authentication status."""
|
|
|
|
permission_classes = [AllowAny]
|
|
serializer_class = AuthStatusOutputSerializer
|
|
|
|
def post(self, request: Request) -> Response:
|
|
if request.user.is_authenticated:
|
|
response_data = {
|
|
"authenticated": True,
|
|
"user": request.user,
|
|
}
|
|
else:
|
|
response_data = {
|
|
"authenticated": False,
|
|
"user": None,
|
|
}
|
|
|
|
serializer = AuthStatusOutputSerializer(response_data)
|
|
return Response(serializer.data)
|
|
|
|
|
|
# === SOCIAL PROVIDER MANAGEMENT API VIEWS ===
|
|
|
|
|
|
@extend_schema_view(
|
|
get=extend_schema(
|
|
summary="Get available social providers",
|
|
description="Retrieve list of available social authentication providers.",
|
|
responses={
|
|
200: AvailableProviderSerializer(many=True),
|
|
},
|
|
tags=["Social Authentication"],
|
|
),
|
|
)
|
|
class AvailableProvidersAPIView(APIView):
|
|
"""API endpoint to get available social providers."""
|
|
|
|
permission_classes = [AllowAny]
|
|
serializer_class = AvailableProviderSerializer
|
|
|
|
def get(self, request: Request) -> Response:
|
|
providers = [
|
|
{
|
|
"provider": "google",
|
|
"name": "Google",
|
|
"login_url": "/auth/social/google/",
|
|
"connect_url": "/auth/social/connect/google/",
|
|
},
|
|
{
|
|
"provider": "discord",
|
|
"name": "Discord",
|
|
"login_url": "/auth/social/discord/",
|
|
"connect_url": "/auth/social/connect/discord/",
|
|
},
|
|
]
|
|
|
|
serializer = AvailableProviderSerializer(providers, many=True)
|
|
return Response(serializer.data)
|
|
|
|
|
|
@extend_schema_view(
|
|
get=extend_schema(
|
|
summary="Get connected social providers",
|
|
description="Retrieve list of social providers connected to the user's account.",
|
|
responses={
|
|
200: ConnectedProviderSerializer(many=True),
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Social Authentication"],
|
|
),
|
|
)
|
|
class ConnectedProvidersAPIView(APIView):
|
|
"""API endpoint to get user's connected social providers."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = ConnectedProviderSerializer
|
|
|
|
def get(self, request: Request) -> Response:
|
|
service = SocialProviderService()
|
|
providers = service.get_connected_providers(request.user)
|
|
|
|
serializer = ConnectedProviderSerializer(providers, many=True)
|
|
return Response(serializer.data)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Connect social provider",
|
|
description="Connect a social authentication provider to the user's account.",
|
|
request=ConnectProviderInputSerializer,
|
|
responses={
|
|
200: ConnectProviderOutputSerializer,
|
|
400: SocialProviderErrorSerializer,
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Social Authentication"],
|
|
),
|
|
)
|
|
class ConnectProviderAPIView(APIView):
|
|
"""API endpoint to connect a social provider."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = ConnectProviderInputSerializer
|
|
|
|
def post(self, request: Request, provider: str) -> Response:
|
|
# Validate provider
|
|
if provider not in ["google", "discord"]:
|
|
return Response(
|
|
{
|
|
"detail": f"Provider '{provider}' is not supported",
|
|
"code": "INVALID_PROVIDER",
|
|
"suggestions": ["Use 'google' or 'discord'"],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
serializer = ConnectProviderInputSerializer(data=request.data)
|
|
if not serializer.is_valid():
|
|
return Response(
|
|
{
|
|
"detail": "Invalid request data",
|
|
"code": "VALIDATION_ERROR",
|
|
"details": serializer.errors,
|
|
"suggestions": ["Provide a valid access_token"],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
access_token = serializer.validated_data["access_token"]
|
|
|
|
try:
|
|
service = SocialProviderService()
|
|
result = service.connect_provider(request.user, provider, access_token)
|
|
|
|
response_serializer = ConnectProviderOutputSerializer(result)
|
|
return Response(response_serializer.data)
|
|
|
|
except Exception as e:
|
|
return Response(
|
|
{
|
|
"success": False,
|
|
"detail": "CONNECTION_FAILED",
|
|
"message": str(e),
|
|
"suggestions": [
|
|
"Verify the access token is valid",
|
|
"Ensure the provider account is not already connected to another user",
|
|
],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Disconnect social provider",
|
|
description="Disconnect a social authentication provider from the user's account.",
|
|
responses={
|
|
200: DisconnectProviderOutputSerializer,
|
|
400: SocialProviderErrorSerializer,
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Social Authentication"],
|
|
),
|
|
)
|
|
class DisconnectProviderAPIView(APIView):
|
|
"""API endpoint to disconnect a social provider."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = DisconnectProviderOutputSerializer
|
|
|
|
def post(self, request: Request, provider: str) -> Response:
|
|
# Validate provider
|
|
if provider not in ["google", "discord"]:
|
|
return Response(
|
|
{
|
|
"detail": f"Provider '{provider}' is not supported",
|
|
"code": "INVALID_PROVIDER",
|
|
"suggestions": ["Use 'google' or 'discord'"],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
try:
|
|
service = SocialProviderService()
|
|
|
|
# Check if disconnection is safe
|
|
can_disconnect, reason = service.can_disconnect_provider(request.user, provider)
|
|
if not can_disconnect:
|
|
return Response(
|
|
{
|
|
"success": False,
|
|
"detail": "UNSAFE_DISCONNECTION",
|
|
"message": reason,
|
|
"suggestions": [
|
|
"Set up email/password authentication before disconnecting",
|
|
"Connect another social provider before disconnecting this one",
|
|
],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
# Perform disconnection
|
|
result = service.disconnect_provider(request.user, provider)
|
|
|
|
response_serializer = DisconnectProviderOutputSerializer(result)
|
|
return Response(response_serializer.data)
|
|
|
|
except Exception as e:
|
|
return Response(
|
|
{
|
|
"success": False,
|
|
"detail": "DISCONNECTION_FAILED",
|
|
"message": str(e),
|
|
"suggestions": [
|
|
"Verify the provider is currently connected",
|
|
"Ensure you have alternative authentication methods",
|
|
],
|
|
},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
|
|
@extend_schema_view(
|
|
get=extend_schema(
|
|
summary="Get social authentication status",
|
|
description="Get comprehensive social authentication status for the user.",
|
|
responses={
|
|
200: SocialAuthStatusSerializer,
|
|
401: "Unauthorized",
|
|
},
|
|
tags=["Social Authentication"],
|
|
),
|
|
)
|
|
class SocialAuthStatusAPIView(APIView):
|
|
"""API endpoint to get social authentication status."""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
serializer_class = SocialAuthStatusSerializer
|
|
|
|
def get(self, request: Request) -> Response:
|
|
service = SocialProviderService()
|
|
auth_status = service.get_auth_status(request.user)
|
|
|
|
serializer = SocialAuthStatusSerializer(auth_status)
|
|
return Response(serializer.data)
|
|
|
|
|
|
# === EMAIL VERIFICATION API VIEWS ===
|
|
|
|
|
|
@extend_schema_view(
|
|
get=extend_schema(
|
|
summary="Verify email address",
|
|
description="Verify user's email address using verification token.",
|
|
responses={
|
|
200: {"type": "object", "properties": {"message": {"type": "string"}}},
|
|
400: "Bad Request",
|
|
404: "Token not found",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class EmailVerificationAPIView(APIView):
|
|
"""API endpoint for email verification."""
|
|
|
|
permission_classes = [AllowAny]
|
|
authentication_classes = []
|
|
|
|
def get(self, request: Request, token: str) -> Response:
|
|
from apps.accounts.models import EmailVerification
|
|
|
|
try:
|
|
verification = EmailVerification.objects.select_related("user").get(token=token)
|
|
user = verification.user
|
|
|
|
# Activate the user
|
|
user.is_active = True
|
|
user.save()
|
|
|
|
# Delete the verification record
|
|
verification.delete()
|
|
|
|
return Response({"detail": "Email verified successfully. You can now log in.", "success": True})
|
|
|
|
except EmailVerification.DoesNotExist:
|
|
return Response({"detail": "Invalid or expired verification token"}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Resend verification email",
|
|
description="Resend email verification to user's email address.",
|
|
request={"type": "object", "properties": {"email": {"type": "string", "format": "email"}}},
|
|
responses={
|
|
200: {"type": "object", "properties": {"message": {"type": "string"}}},
|
|
400: "Bad Request",
|
|
404: "User not found",
|
|
},
|
|
tags=["Authentication"],
|
|
),
|
|
)
|
|
class ResendVerificationAPIView(APIView):
|
|
"""API endpoint to resend email verification."""
|
|
|
|
permission_classes = [AllowAny]
|
|
authentication_classes = []
|
|
|
|
def post(self, request: Request) -> Response:
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from django.utils.crypto import get_random_string
|
|
from django_forwardemail.services import EmailService
|
|
|
|
from apps.accounts.models import EmailVerification
|
|
|
|
email = request.data.get("email")
|
|
if not email:
|
|
return Response({"detail": "Email address is required"}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
try:
|
|
user = UserModel.objects.get(email__iexact=email.strip().lower())
|
|
|
|
# Don't resend if user is already active
|
|
if user.is_active:
|
|
return Response({"detail": "Email is already verified"}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
# Create or update verification record
|
|
verification, created = EmailVerification.objects.get_or_create(
|
|
user=user, defaults={"token": get_random_string(64)}
|
|
)
|
|
|
|
if not created:
|
|
# Update existing token and timestamp
|
|
verification.token = get_random_string(64)
|
|
verification.save()
|
|
|
|
# Send verification email
|
|
site = get_current_site(_get_underlying_request(request))
|
|
verification_url = request.build_absolute_uri(f"/api/v1/auth/verify-email/{verification.token}/")
|
|
|
|
try:
|
|
EmailService.send_email(
|
|
to=user.email,
|
|
subject="Verify your ThrillWiki account",
|
|
text=f"""
|
|
Welcome to ThrillWiki!
|
|
|
|
Please verify your email address by clicking the link below:
|
|
{verification_url}
|
|
|
|
If you didn't create an account, you can safely ignore this email.
|
|
|
|
Thanks,
|
|
The ThrillWiki Team
|
|
""".strip(),
|
|
site=site,
|
|
)
|
|
|
|
return Response({"detail": "Verification email sent successfully", "success": True})
|
|
|
|
except Exception as e:
|
|
capture_and_log(e, 'Send verification email', source='api')
|
|
|
|
return Response(
|
|
{"detail": "Failed to send verification email"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
|
|
)
|
|
|
|
except UserModel.DoesNotExist:
|
|
# Don't reveal whether email exists
|
|
return Response({"detail": "If the email exists, a verification email has been sent", "success": True})
|
|
|
|
# Note: User Profile, Top List, and Top List Item ViewSets are now handled
|
|
# by the dedicated accounts app at backend/apps/api/v1/accounts/views.py
|
|
# to avoid duplication and maintain clean separation of concerns.
|
|
|
|
|
|
@extend_schema_view(
|
|
post=extend_schema(
|
|
summary="Process OAuth profile",
|
|
description="Process OAuth profile data during social authentication flow.",
|
|
request={
|
|
"type": "object",
|
|
"properties": {
|
|
"provider": {"type": "string", "description": "OAuth provider (e.g., google, discord)"},
|
|
"profile": {
|
|
"type": "object",
|
|
"description": "Profile data from OAuth provider",
|
|
"properties": {
|
|
"id": {"type": "string"},
|
|
"email": {"type": "string", "format": "email"},
|
|
"name": {"type": "string"},
|
|
"avatar_url": {"type": "string", "format": "uri"},
|
|
},
|
|
},
|
|
"access_token": {"type": "string", "description": "OAuth access token"},
|
|
},
|
|
"required": ["provider", "profile"],
|
|
},
|
|
responses={
|
|
200: {
|
|
"type": "object",
|
|
"properties": {
|
|
"success": {"type": "boolean"},
|
|
"action": {"type": "string", "enum": ["created", "updated", "linked"]},
|
|
"user": {"type": "object"},
|
|
"profile_synced": {"type": "boolean"},
|
|
},
|
|
},
|
|
400: "Bad Request",
|
|
401: "Unauthorized",
|
|
403: "Account suspended",
|
|
},
|
|
tags=["Social Authentication"],
|
|
),
|
|
)
|
|
class ProcessOAuthProfileAPIView(APIView):
|
|
"""
|
|
API endpoint to process OAuth profile data.
|
|
|
|
This endpoint is called AFTER the OAuth flow is complete to:
|
|
1. Check if user is banned (SECURITY CRITICAL)
|
|
2. Extract avatar from OAuth provider
|
|
3. Download and upload avatar to Cloudflare Images
|
|
4. Sync display name from OAuth provider
|
|
5. Update username if it's a generic UUID-based username
|
|
|
|
Called with an empty body - uses the authenticated session.
|
|
|
|
Full parity with Supabase Edge Function: process-oauth-profile
|
|
|
|
BULLETPROOFED: Comprehensive validation, sanitization, and error handling.
|
|
"""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
# Security constants
|
|
MAX_AVATAR_SIZE = 10 * 1024 * 1024 # 10MB
|
|
AVATAR_DOWNLOAD_TIMEOUT = 10.0 # seconds
|
|
AVATAR_UPLOAD_TIMEOUT = 30.0 # seconds
|
|
MAX_USERNAME_LENGTH = 150
|
|
MIN_USERNAME_LENGTH = 3
|
|
ALLOWED_USERNAME_CHARS = set("abcdefghijklmnopqrstuvwxyz0123456789_")
|
|
|
|
# Rate limiting for avatar uploads (prevent abuse)
|
|
AVATAR_UPLOAD_COOLDOWN = 60 # seconds between uploads
|
|
|
|
def post(self, request: Request) -> Response:
|
|
import re
|
|
import httpx
|
|
from django.db import transaction
|
|
from django.core.cache import cache
|
|
|
|
try:
|
|
user = request.user
|
|
|
|
# ================================================================
|
|
# STEP 0: Validate user object exists and is valid
|
|
# ================================================================
|
|
if not user or not hasattr(user, 'user_id'):
|
|
logger.error("ProcessOAuthProfile called with invalid user object")
|
|
return Response({
|
|
"success": False,
|
|
"error": "Invalid user session",
|
|
}, status=status.HTTP_401_UNAUTHORIZED)
|
|
|
|
user_id_str = str(user.user_id)
|
|
|
|
# ================================================================
|
|
# STEP 1: CRITICAL - Check ban status FIRST
|
|
# ================================================================
|
|
is_banned = getattr(user, 'is_banned', False)
|
|
|
|
# Also check via profile if applicable
|
|
if not is_banned:
|
|
try:
|
|
from apps.accounts.models import UserProfile
|
|
profile_check = UserProfile.objects.filter(user=user).first()
|
|
if profile_check and getattr(profile_check, 'is_banned', False):
|
|
is_banned = True
|
|
except Exception:
|
|
pass
|
|
|
|
if is_banned:
|
|
ban_reason = getattr(user, 'ban_reason', None) or "Policy violation"
|
|
# Sanitize ban reason for response
|
|
safe_ban_reason = str(ban_reason)[:200] if ban_reason else None
|
|
|
|
logger.warning(
|
|
f"Banned user attempted OAuth profile update",
|
|
extra={"user_id": user_id_str, "ban_reason": safe_ban_reason}
|
|
)
|
|
|
|
return Response({
|
|
"error": "Account suspended",
|
|
"message": (
|
|
f"Your account has been suspended. Reason: {safe_ban_reason}"
|
|
if safe_ban_reason
|
|
else "Your account has been suspended. Contact support for assistance."
|
|
),
|
|
"ban_reason": safe_ban_reason,
|
|
}, status=status.HTTP_403_FORBIDDEN)
|
|
|
|
# ================================================================
|
|
# STEP 2: Check rate limiting for avatar uploads
|
|
# ================================================================
|
|
rate_limit_key = f"oauth_profile:avatar:{user_id_str}"
|
|
if cache.get(rate_limit_key):
|
|
return Response({
|
|
"success": True,
|
|
"action": "rate_limited",
|
|
"message": "Please wait before updating your profile again",
|
|
"avatar_uploaded": False,
|
|
"profile_updated": False,
|
|
})
|
|
|
|
# ================================================================
|
|
# STEP 3: Get OAuth provider info from social accounts
|
|
# ================================================================
|
|
try:
|
|
from allauth.socialaccount.models import SocialAccount
|
|
except ImportError:
|
|
logger.error("django-allauth not installed")
|
|
return Response({
|
|
"success": False,
|
|
"error": "Social authentication not configured",
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
social_accounts = SocialAccount.objects.filter(user=user)
|
|
|
|
if not social_accounts.exists():
|
|
return Response({
|
|
"success": True,
|
|
"action": "skipped",
|
|
"message": "No OAuth accounts linked",
|
|
})
|
|
|
|
# Get the most recent social account
|
|
social_account = social_accounts.order_by("-date_joined").first()
|
|
if not social_account:
|
|
return Response({
|
|
"success": True,
|
|
"action": "skipped",
|
|
"message": "No valid OAuth account found",
|
|
})
|
|
|
|
provider = social_account.provider or "unknown"
|
|
extra_data = social_account.extra_data or {}
|
|
|
|
# Validate extra_data is a dict
|
|
if not isinstance(extra_data, dict):
|
|
logger.warning(f"Invalid extra_data type for user {user_id_str}: {type(extra_data)}")
|
|
extra_data = {}
|
|
|
|
# ================================================================
|
|
# STEP 4: Extract profile data based on provider (with sanitization)
|
|
# ================================================================
|
|
avatar_url = None
|
|
display_name = None
|
|
username_base = None
|
|
|
|
if provider == "google":
|
|
avatar_url = self._sanitize_url(extra_data.get("picture"))
|
|
display_name = self._sanitize_display_name(extra_data.get("name"))
|
|
email = extra_data.get("email", "")
|
|
if email and isinstance(email, str):
|
|
username_base = self._sanitize_username(email.split("@")[0])
|
|
|
|
elif provider == "discord":
|
|
discord_data = extra_data
|
|
discord_id = discord_data.get("id") or discord_data.get("sub")
|
|
|
|
display_name = self._sanitize_display_name(
|
|
discord_data.get("global_name")
|
|
or discord_data.get("full_name")
|
|
or discord_data.get("name")
|
|
)
|
|
|
|
# Discord avatar URL construction with validation
|
|
avatar_hash = discord_data.get("avatar")
|
|
if discord_id and avatar_hash and isinstance(discord_id, str) and isinstance(avatar_hash, str):
|
|
# Validate discord_id is numeric
|
|
if discord_id.isdigit():
|
|
# Validate avatar_hash is alphanumeric
|
|
if re.match(r'^[a-zA-Z0-9_]+$', avatar_hash):
|
|
avatar_url = f"https://cdn.discordapp.com/avatars/{discord_id}/{avatar_hash}.png?size=256"
|
|
|
|
if not avatar_url:
|
|
avatar_url = self._sanitize_url(
|
|
discord_data.get("avatar_url") or discord_data.get("picture")
|
|
)
|
|
|
|
raw_username = discord_data.get("username") or discord_data.get("name", "")
|
|
if raw_username and isinstance(raw_username, str):
|
|
username_base = self._sanitize_username(raw_username.split("#")[0])
|
|
if not username_base and discord_id:
|
|
username_base = f"discord_{str(discord_id)[:8]}"
|
|
|
|
else:
|
|
# Generic provider handling
|
|
avatar_url = self._sanitize_url(
|
|
extra_data.get("picture")
|
|
or extra_data.get("avatar_url")
|
|
or extra_data.get("avatar")
|
|
)
|
|
display_name = self._sanitize_display_name(
|
|
extra_data.get("name") or extra_data.get("display_name")
|
|
)
|
|
|
|
# ================================================================
|
|
# STEP 5: Get or create user profile (with transaction)
|
|
# ================================================================
|
|
from apps.accounts.models import UserProfile
|
|
|
|
with transaction.atomic():
|
|
profile, profile_created = UserProfile.objects.select_for_update().get_or_create(
|
|
user=user
|
|
)
|
|
|
|
# Check if profile already has an avatar
|
|
if profile.avatar_id:
|
|
return Response({
|
|
"success": True,
|
|
"action": "skipped",
|
|
"message": "Avatar already exists",
|
|
"avatar_uploaded": False,
|
|
"profile_updated": False,
|
|
})
|
|
|
|
# ================================================================
|
|
# STEP 6: Download and upload avatar to Cloudflare (outside transaction)
|
|
# ================================================================
|
|
avatar_uploaded = False
|
|
|
|
if avatar_url:
|
|
try:
|
|
# Validate URL scheme
|
|
if not avatar_url.startswith(('https://', 'http://')):
|
|
logger.warning(f"Invalid avatar URL scheme: {avatar_url[:50]}")
|
|
else:
|
|
# Download avatar from provider
|
|
download_response = httpx.get(
|
|
avatar_url,
|
|
timeout=self.AVATAR_DOWNLOAD_TIMEOUT,
|
|
follow_redirects=True,
|
|
headers={
|
|
"User-Agent": "ThrillWiki/1.0",
|
|
"Accept": "image/*",
|
|
},
|
|
)
|
|
|
|
if download_response.status_code == 200:
|
|
image_data = download_response.content
|
|
content_type = download_response.headers.get("content-type", "")
|
|
|
|
# Validate content type
|
|
if not content_type.startswith("image/"):
|
|
logger.warning(f"Invalid content type for avatar: {content_type}")
|
|
# Validate file size
|
|
elif len(image_data) > self.MAX_AVATAR_SIZE:
|
|
logger.warning(
|
|
f"Avatar too large for user {user_id_str}: {len(image_data)} bytes"
|
|
)
|
|
# Validate minimum size (avoid empty images)
|
|
elif len(image_data) < 100:
|
|
logger.warning(f"Avatar too small for user {user_id_str}")
|
|
else:
|
|
avatar_uploaded = self._upload_to_cloudflare(
|
|
image_data, user_id_str, provider, profile
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Avatar download failed: {download_response.status_code}",
|
|
extra={"user_id": user_id_str, "provider": provider}
|
|
)
|
|
|
|
except httpx.TimeoutException:
|
|
logger.warning(f"Avatar download timeout for user {user_id_str}")
|
|
except httpx.HTTPError as download_error:
|
|
logger.warning(f"Failed to download avatar: {download_error}")
|
|
except Exception as e:
|
|
logger.warning(f"Unexpected avatar error: {e}")
|
|
|
|
# Set rate limit after successful processing
|
|
if avatar_uploaded:
|
|
cache.set(rate_limit_key, True, self.AVATAR_UPLOAD_COOLDOWN)
|
|
|
|
# ================================================================
|
|
# STEP 7: Update display name if not set (with validation)
|
|
# ================================================================
|
|
profile_updated = False
|
|
|
|
if display_name and not getattr(user, "display_name", None):
|
|
try:
|
|
user.display_name = display_name
|
|
user.save(update_fields=["display_name"])
|
|
profile_updated = True
|
|
except Exception as e:
|
|
logger.warning(f"Failed to update display name: {e}")
|
|
|
|
# ================================================================
|
|
# STEP 8: Update username if it's a generic UUID-based username
|
|
# ================================================================
|
|
current_username = getattr(user, "username", "") or ""
|
|
if username_base and current_username.startswith("user_"):
|
|
try:
|
|
new_username = self._ensure_unique_username(username_base, user.user_id)
|
|
if new_username and new_username != current_username:
|
|
user.username = new_username
|
|
user.save(update_fields=["username"])
|
|
profile_updated = True
|
|
logger.info(
|
|
f"Username updated from {current_username} to {new_username}",
|
|
extra={"user_id": user_id_str}
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to update username: {e}")
|
|
|
|
return Response({
|
|
"success": True,
|
|
"action": "processed",
|
|
"provider": provider,
|
|
"avatar_uploaded": avatar_uploaded,
|
|
"profile_updated": profile_updated,
|
|
"message": "OAuth profile processed successfully",
|
|
})
|
|
|
|
except Exception as e:
|
|
capture_and_log(e, "Process OAuth profile", source="api", request=request)
|
|
return Response({
|
|
"success": False,
|
|
"error": "Failed to process OAuth profile",
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
def _sanitize_url(self, url) -> str | None:
|
|
"""Sanitize and validate URL."""
|
|
if not url or not isinstance(url, str):
|
|
return None
|
|
|
|
url = url.strip()[:2000] # Limit length
|
|
|
|
# Basic URL validation
|
|
if not url.startswith(('https://', 'http://')):
|
|
return None
|
|
|
|
# Block obviously malicious patterns
|
|
dangerous_patterns = ['javascript:', 'data:', 'file:', '<script', 'onclick']
|
|
for pattern in dangerous_patterns:
|
|
if pattern.lower() in url.lower():
|
|
return None
|
|
|
|
return url
|
|
|
|
def _sanitize_display_name(self, name) -> str | None:
|
|
"""Sanitize display name."""
|
|
if not name or not isinstance(name, str):
|
|
return None
|
|
|
|
import re
|
|
|
|
# Strip and limit length
|
|
name = name.strip()[:100]
|
|
|
|
# Remove control characters
|
|
name = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', name)
|
|
|
|
# Remove excessive whitespace
|
|
name = ' '.join(name.split())
|
|
|
|
# Must have at least 1 character
|
|
if len(name) < 1:
|
|
return None
|
|
|
|
return name
|
|
|
|
def _sanitize_username(self, username) -> str | None:
|
|
"""Sanitize username for use."""
|
|
if not username or not isinstance(username, str):
|
|
return None
|
|
|
|
import re
|
|
|
|
# Lowercase and remove non-allowed characters
|
|
username = username.lower().strip()
|
|
username = re.sub(r'[^a-z0-9_]', '', username)
|
|
|
|
# Enforce length limits
|
|
if len(username) < self.MIN_USERNAME_LENGTH:
|
|
return None
|
|
|
|
username = username[:self.MAX_USERNAME_LENGTH]
|
|
|
|
return username
|
|
|
|
def _upload_to_cloudflare(self, image_data: bytes, user_id: str, provider: str, profile) -> bool:
|
|
"""Upload image to Cloudflare Images with error handling."""
|
|
import httpx
|
|
from django.db import transaction
|
|
|
|
try:
|
|
from django_cloudflareimages_toolkit.models import CloudflareImage
|
|
from django_cloudflareimages_toolkit.services import CloudflareImagesService
|
|
|
|
cf_service = CloudflareImagesService()
|
|
|
|
# Request direct upload URL
|
|
upload_result = cf_service.get_direct_upload_url(
|
|
metadata={
|
|
"type": "avatar",
|
|
"user_id": user_id,
|
|
"provider": provider,
|
|
}
|
|
)
|
|
|
|
if not upload_result or "upload_url" not in upload_result:
|
|
logger.warning("Failed to get Cloudflare upload URL")
|
|
return False
|
|
|
|
upload_url = upload_result["upload_url"]
|
|
cloudflare_id = upload_result.get("id") or upload_result.get("cloudflare_id")
|
|
|
|
if not cloudflare_id:
|
|
logger.warning("No Cloudflare ID in upload result")
|
|
return False
|
|
|
|
# Upload image to Cloudflare
|
|
files = {"file": ("avatar.png", image_data, "image/png")}
|
|
upload_response = httpx.post(
|
|
upload_url,
|
|
files=files,
|
|
timeout=self.AVATAR_UPLOAD_TIMEOUT,
|
|
)
|
|
|
|
if upload_response.status_code not in [200, 201]:
|
|
logger.warning(f"Cloudflare upload failed: {upload_response.status_code}")
|
|
return False
|
|
|
|
# Create CloudflareImage record and link to profile
|
|
with transaction.atomic():
|
|
cf_image = CloudflareImage.objects.create(
|
|
cloudflare_id=cloudflare_id,
|
|
is_uploaded=True,
|
|
metadata={
|
|
"type": "avatar",
|
|
"user_id": user_id,
|
|
"provider": provider,
|
|
}
|
|
)
|
|
|
|
profile.avatar = cf_image
|
|
profile.save(update_fields=["avatar"])
|
|
|
|
logger.info(
|
|
f"Avatar uploaded successfully",
|
|
extra={"user_id": user_id, "provider": provider, "cloudflare_id": cloudflare_id}
|
|
)
|
|
return True
|
|
|
|
except ImportError:
|
|
logger.warning("django-cloudflareimages-toolkit not available")
|
|
return False
|
|
except Exception as cf_error:
|
|
logger.warning(f"Cloudflare upload error: {cf_error}")
|
|
return False
|
|
|
|
def _ensure_unique_username(self, base_username: str, user_id: str, max_attempts: int = 10) -> str | None:
|
|
"""
|
|
Ensure username is unique by appending numbers if needed.
|
|
|
|
Returns None if no valid username can be generated.
|
|
"""
|
|
if not base_username:
|
|
return None
|
|
|
|
username = base_username.lower()[:self.MAX_USERNAME_LENGTH]
|
|
|
|
# Validate characters
|
|
if not all(c in self.ALLOWED_USERNAME_CHARS for c in username):
|
|
return None
|
|
|
|
attempt = 0
|
|
|
|
while attempt < max_attempts:
|
|
try:
|
|
existing = UserModel.objects.filter(username=username).exclude(user_id=user_id).exists()
|
|
if not existing:
|
|
return username
|
|
except Exception:
|
|
break
|
|
|
|
attempt += 1
|
|
# Ensure we don't exceed max length with suffix
|
|
suffix = f"_{attempt}"
|
|
max_base = self.MAX_USERNAME_LENGTH - len(suffix)
|
|
username = f"{base_username.lower()[:max_base]}{suffix}"
|
|
|
|
# Fallback to UUID-based username
|
|
return f"user_{str(user_id)[:8]}"
|