""" Auth domain views for ThrillWiki API v1. This module contains all authentication-related API endpoints including login, signup, logout, password management, social authentication, user profiles, and top lists. """ import logging from typing import cast # added 'cast' from django.contrib.auth import authenticate, get_user_model, login, logout from django.contrib.sites.shortcuts import get_current_site from django.core.exceptions import ValidationError from django.db.models import Q from django.http import HttpRequest # new import from drf_spectacular.utils import extend_schema, extend_schema_view from rest_framework import status from rest_framework.permissions import AllowAny, IsAuthenticated from rest_framework.request import Request from rest_framework.response import Response from rest_framework.views import APIView from apps.accounts.services.social_provider_service import SocialProviderService from apps.core.utils import capture_and_log # Import directly from the auth serializers.py file (not the serializers package) from .serializers import ( AuthStatusOutputSerializer, # Authentication serializers LoginInputSerializer, LoginOutputSerializer, LogoutOutputSerializer, PasswordChangeInputSerializer, PasswordChangeOutputSerializer, PasswordResetInputSerializer, PasswordResetOutputSerializer, SignupInputSerializer, SignupOutputSerializer, SocialProviderOutputSerializer, UserOutputSerializer, ) from .serializers_package.social import ( AvailableProviderSerializer, ConnectedProviderSerializer, ConnectProviderInputSerializer, ConnectProviderOutputSerializer, DisconnectProviderOutputSerializer, SocialAuthStatusSerializer, SocialProviderErrorSerializer, ) # Handle optional dependencies with fallback classes class FallbackTurnstileMixin: """Fallback mixin if TurnstileMixin is not available.""" def validate_turnstile(self, request): pass # Try to import the real class, use fallback if not available and ensure it's a class/type try: from apps.accounts.mixins import TurnstileMixin as _ImportedTurnstileMixin # Ensure the imported object is a class/type that can be used as a base class. # If it's not a type for any reason, fall back to the safe mixin. TurnstileMixin = _ImportedTurnstileMixin if isinstance(_ImportedTurnstileMixin, type) else FallbackTurnstileMixin except Exception: # Catch any import errors or unexpected exceptions and use the fallback mixin. TurnstileMixin = FallbackTurnstileMixin UserModel = get_user_model() logger = logging.getLogger(__name__) # Helper: safely obtain underlying HttpRequest (used by Django auth) def _get_underlying_request(request: Request) -> HttpRequest: """ Return a django HttpRequest for use with Django auth and site utilities. DRF's Request wraps the underlying HttpRequest in ._request; cast() tells the typechecker that the returned object is indeed an HttpRequest. """ return cast(HttpRequest, getattr(request, "_request", request)) # Helper: encapsulate user lookup + authenticate to reduce complexity in view def _authenticate_user_by_lookup(email_or_username: str, password: str, request: Request) -> UserModel | None: """ Try a single optimized query to find a user by email OR username then authenticate. Returns authenticated user or None. """ try: # Single query to find user by email OR username if "@" in (email_or_username or ""): user_obj = ( UserModel.objects.select_related() .filter(Q(email=email_or_username) | Q(username=email_or_username)) .first() ) else: user_obj = ( UserModel.objects.select_related() .filter(Q(username=email_or_username) | Q(email=email_or_username)) .first() ) if user_obj: username_val = getattr(user_obj, "username", None) return authenticate( # type: ignore[arg-type] _get_underlying_request(request), username=username_val, password=password, ) except Exception: # Fallback to authenticate directly with provided identifier return authenticate( # type: ignore[arg-type] _get_underlying_request(request), username=email_or_username, password=password, ) return None # === AUTHENTICATION API VIEWS === @extend_schema_view( post=extend_schema( summary="User login", description="Authenticate user with username/email and password.", request=LoginInputSerializer, responses={ 200: LoginOutputSerializer, 400: "Bad Request", }, tags=["Authentication"], ), ) class LoginAPIView(APIView): """API endpoint for user login.""" permission_classes = [AllowAny] authentication_classes = [] serializer_class = LoginInputSerializer def post(self, request: Request) -> Response: try: # instantiate mixin before calling to avoid type-mismatch in static analysis TurnstileMixin().validate_turnstile(request) except ValidationError as e: return Response({"detail": str(e)}, status=status.HTTP_400_BAD_REQUEST) except Exception: # If mixin doesn't do anything, continue pass serializer = LoginInputSerializer(data=request.data) if serializer.is_valid(): validated = serializer.validated_data # Use .get to satisfy static analyzers email_or_username = validated.get("username") # type: ignore[assignment] password = validated.get("password") # type: ignore[assignment] if not email_or_username or not password: return Response( {"detail": "username and password are required"}, status=status.HTTP_400_BAD_REQUEST, ) user = _authenticate_user_by_lookup(email_or_username, password, request) if user: if getattr(user, "is_active", False): # pass a real HttpRequest to Django login with backend specified login(_get_underlying_request(request), user, backend="django.contrib.auth.backends.ModelBackend") # Generate JWT tokens from rest_framework_simplejwt.tokens import RefreshToken refresh = RefreshToken.for_user(user) access_token = refresh.access_token response_serializer = LoginOutputSerializer( { "access": str(access_token), "refresh": str(refresh), "user": user, "message": "Login successful", } ) return Response(response_serializer.data) else: return Response( { "detail": "Please verify your email address before logging in. Check your email for a verification link.", "code": "EMAIL_VERIFICATION_REQUIRED", "email_verification_required": True, }, status=status.HTTP_400_BAD_REQUEST, ) else: return Response( {"detail": "Invalid credentials"}, status=status.HTTP_400_BAD_REQUEST, ) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @extend_schema_view( post=extend_schema( summary="User registration", description="Register a new user account. Email verification required.", request=SignupInputSerializer, responses={ 201: SignupOutputSerializer, 400: "Bad Request", }, tags=["Authentication"], ), ) class SignupAPIView(APIView): """API endpoint for user registration.""" permission_classes = [AllowAny] authentication_classes = [] serializer_class = SignupInputSerializer def post(self, request: Request) -> Response: try: # instantiate mixin before calling to avoid type-mismatch in static analysis TurnstileMixin().validate_turnstile(request) except ValidationError as e: return Response({"detail": str(e)}, status=status.HTTP_400_BAD_REQUEST) except Exception: # If mixin doesn't do anything, continue pass serializer = SignupInputSerializer(data=request.data, context={"request": request}) if serializer.is_valid(): user = serializer.save() # Don't log in the user immediately - they need to verify their email first response_serializer = SignupOutputSerializer( { "access": None, "refresh": None, "user": user, "detail": "Registration successful. Please check your email to verify your account.", "email_verification_required": True, } ) return Response(response_serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @extend_schema_view( post=extend_schema( summary="User logout", description="Logout the current user and blacklist their refresh token.", responses={ 200: LogoutOutputSerializer, 401: "Unauthorized", }, tags=["Authentication"], ), ) class LogoutAPIView(APIView): """API endpoint for user logout.""" permission_classes = [IsAuthenticated] serializer_class = LogoutOutputSerializer def post(self, request: Request) -> Response: try: # Get refresh token from request data with proper type handling refresh_token = None if hasattr(request, "data") and request.data is not None: data = getattr(request, "data", {}) if hasattr(data, "get"): refresh_token = data.get("refresh") if refresh_token and isinstance(refresh_token, str): # Blacklist the refresh token from rest_framework_simplejwt.tokens import RefreshToken try: # Create RefreshToken from string and blacklist it refresh_token_obj = RefreshToken(refresh_token) # type: ignore[arg-type] refresh_token_obj.blacklist() except Exception: # Token might be invalid or already blacklisted pass # Also delete the old token for backward compatibility if hasattr(request.user, "auth_token"): request.user.auth_token.delete() # Logout from session using the underlying HttpRequest logout(_get_underlying_request(request)) response_serializer = LogoutOutputSerializer({"detail": "Logout successful"}) return Response(response_serializer.data) except Exception: return Response({"detail": "Logout failed"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @extend_schema_view( get=extend_schema( summary="Get current user", description="Retrieve information about the currently authenticated user.", responses={ 200: UserOutputSerializer, 401: "Unauthorized", }, tags=["Authentication"], ), ) class CurrentUserAPIView(APIView): """API endpoint to get current user information.""" permission_classes = [IsAuthenticated] serializer_class = UserOutputSerializer def get(self, request: Request) -> Response: serializer = UserOutputSerializer(request.user) return Response(serializer.data) @extend_schema_view( post=extend_schema( summary="Request password reset", description="Send a password reset email to the user.", request=PasswordResetInputSerializer, responses={ 200: PasswordResetOutputSerializer, 400: "Bad Request", }, tags=["Authentication"], ), ) class PasswordResetAPIView(APIView): """API endpoint to request password reset.""" permission_classes = [AllowAny] serializer_class = PasswordResetInputSerializer def post(self, request: Request) -> Response: serializer = PasswordResetInputSerializer(data=request.data, context={"request": request}) if serializer.is_valid(): serializer.save() response_serializer = PasswordResetOutputSerializer({"detail": "Password reset email sent"}) return Response(response_serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @extend_schema_view( post=extend_schema( summary="Change password", description="Change the current user's password.", request=PasswordChangeInputSerializer, responses={ 200: PasswordChangeOutputSerializer, 400: "Bad Request", 401: "Unauthorized", }, tags=["Authentication"], ), ) class PasswordChangeAPIView(APIView): """API endpoint to change password.""" permission_classes = [IsAuthenticated] serializer_class = PasswordChangeInputSerializer def post(self, request: Request) -> Response: serializer = PasswordChangeInputSerializer(data=request.data, context={"request": request}) if serializer.is_valid(): serializer.save() response_serializer = PasswordChangeOutputSerializer({"detail": "Password changed successfully"}) return Response(response_serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @extend_schema_view( get=extend_schema( summary="Get social providers", description="Retrieve available social authentication providers.", responses={200: "List of social providers"}, tags=["Authentication"], ), ) class SocialProvidersAPIView(APIView): """API endpoint to get available social authentication providers.""" permission_classes = [AllowAny] serializer_class = SocialProviderOutputSerializer def get(self, request: Request) -> Response: from django.core.cache import cache # get_current_site expects a django HttpRequest; _get_underlying_request now returns HttpRequest site = get_current_site(_get_underlying_request(request)) # Cache key based on site and request host - use getattr to avoid attribute errors site_id = getattr(site, "id", getattr(site, "pk", None)) cache_key = f"social_providers:{site_id}:{request.get_host()}" # Try to get from cache first (cache for 15 minutes) cached_providers = cache.get(cache_key) if cached_providers is not None: return Response(cached_providers) providers_list = [] # Optimized query: filter by site and order by provider name from allauth.socialaccount.models import SocialApp social_apps = SocialApp.objects.filter(sites=site).order_by("provider") for social_app in social_apps: try: provider_name = social_app.name or getattr(social_app, "provider", "").title() auth_url = request.build_absolute_uri(f"/accounts/{social_app.provider}/login/") providers_list.append( { "id": social_app.provider, "name": provider_name, "authUrl": auth_url, } ) except Exception: continue serializer = SocialProviderOutputSerializer(providers_list, many=True) response_data = serializer.data cache.set(cache_key, response_data, 900) return Response(response_data) @extend_schema_view( post=extend_schema( summary="Check authentication status", description="Check if user is authenticated and return user data.", responses={200: AuthStatusOutputSerializer}, tags=["Authentication"], ), ) class AuthStatusAPIView(APIView): """API endpoint to check authentication status.""" permission_classes = [AllowAny] serializer_class = AuthStatusOutputSerializer def post(self, request: Request) -> Response: if request.user.is_authenticated: response_data = { "authenticated": True, "user": request.user, } else: response_data = { "authenticated": False, "user": None, } serializer = AuthStatusOutputSerializer(response_data) return Response(serializer.data) # === SOCIAL PROVIDER MANAGEMENT API VIEWS === @extend_schema_view( get=extend_schema( summary="Get available social providers", description="Retrieve list of available social authentication providers.", responses={ 200: AvailableProviderSerializer(many=True), }, tags=["Social Authentication"], ), ) class AvailableProvidersAPIView(APIView): """API endpoint to get available social providers.""" permission_classes = [AllowAny] serializer_class = AvailableProviderSerializer def get(self, request: Request) -> Response: providers = [ { "provider": "google", "name": "Google", "login_url": "/auth/social/google/", "connect_url": "/auth/social/connect/google/", }, { "provider": "discord", "name": "Discord", "login_url": "/auth/social/discord/", "connect_url": "/auth/social/connect/discord/", }, ] serializer = AvailableProviderSerializer(providers, many=True) return Response(serializer.data) @extend_schema_view( get=extend_schema( summary="Get connected social providers", description="Retrieve list of social providers connected to the user's account.", responses={ 200: ConnectedProviderSerializer(many=True), 401: "Unauthorized", }, tags=["Social Authentication"], ), ) class ConnectedProvidersAPIView(APIView): """API endpoint to get user's connected social providers.""" permission_classes = [IsAuthenticated] serializer_class = ConnectedProviderSerializer def get(self, request: Request) -> Response: service = SocialProviderService() providers = service.get_connected_providers(request.user) serializer = ConnectedProviderSerializer(providers, many=True) return Response(serializer.data) @extend_schema_view( post=extend_schema( summary="Connect social provider", description="Connect a social authentication provider to the user's account.", request=ConnectProviderInputSerializer, responses={ 200: ConnectProviderOutputSerializer, 400: SocialProviderErrorSerializer, 401: "Unauthorized", }, tags=["Social Authentication"], ), ) class ConnectProviderAPIView(APIView): """API endpoint to connect a social provider.""" permission_classes = [IsAuthenticated] serializer_class = ConnectProviderInputSerializer def post(self, request: Request, provider: str) -> Response: # Validate provider if provider not in ["google", "discord"]: return Response( { "detail": f"Provider '{provider}' is not supported", "code": "INVALID_PROVIDER", "suggestions": ["Use 'google' or 'discord'"], }, status=status.HTTP_400_BAD_REQUEST, ) serializer = ConnectProviderInputSerializer(data=request.data) if not serializer.is_valid(): return Response( { "detail": "Invalid request data", "code": "VALIDATION_ERROR", "details": serializer.errors, "suggestions": ["Provide a valid access_token"], }, status=status.HTTP_400_BAD_REQUEST, ) access_token = serializer.validated_data["access_token"] try: service = SocialProviderService() result = service.connect_provider(request.user, provider, access_token) response_serializer = ConnectProviderOutputSerializer(result) return Response(response_serializer.data) except Exception as e: return Response( { "success": False, "detail": "CONNECTION_FAILED", "message": str(e), "suggestions": [ "Verify the access token is valid", "Ensure the provider account is not already connected to another user", ], }, status=status.HTTP_400_BAD_REQUEST, ) @extend_schema_view( post=extend_schema( summary="Disconnect social provider", description="Disconnect a social authentication provider from the user's account.", responses={ 200: DisconnectProviderOutputSerializer, 400: SocialProviderErrorSerializer, 401: "Unauthorized", }, tags=["Social Authentication"], ), ) class DisconnectProviderAPIView(APIView): """API endpoint to disconnect a social provider.""" permission_classes = [IsAuthenticated] serializer_class = DisconnectProviderOutputSerializer def post(self, request: Request, provider: str) -> Response: # Validate provider if provider not in ["google", "discord"]: return Response( { "detail": f"Provider '{provider}' is not supported", "code": "INVALID_PROVIDER", "suggestions": ["Use 'google' or 'discord'"], }, status=status.HTTP_400_BAD_REQUEST, ) try: service = SocialProviderService() # Check if disconnection is safe can_disconnect, reason = service.can_disconnect_provider(request.user, provider) if not can_disconnect: return Response( { "success": False, "detail": "UNSAFE_DISCONNECTION", "message": reason, "suggestions": [ "Set up email/password authentication before disconnecting", "Connect another social provider before disconnecting this one", ], }, status=status.HTTP_400_BAD_REQUEST, ) # Perform disconnection result = service.disconnect_provider(request.user, provider) response_serializer = DisconnectProviderOutputSerializer(result) return Response(response_serializer.data) except Exception as e: return Response( { "success": False, "detail": "DISCONNECTION_FAILED", "message": str(e), "suggestions": [ "Verify the provider is currently connected", "Ensure you have alternative authentication methods", ], }, status=status.HTTP_400_BAD_REQUEST, ) @extend_schema_view( get=extend_schema( summary="Get social authentication status", description="Get comprehensive social authentication status for the user.", responses={ 200: SocialAuthStatusSerializer, 401: "Unauthorized", }, tags=["Social Authentication"], ), ) class SocialAuthStatusAPIView(APIView): """API endpoint to get social authentication status.""" permission_classes = [IsAuthenticated] serializer_class = SocialAuthStatusSerializer def get(self, request: Request) -> Response: service = SocialProviderService() auth_status = service.get_auth_status(request.user) serializer = SocialAuthStatusSerializer(auth_status) return Response(serializer.data) # === EMAIL VERIFICATION API VIEWS === @extend_schema_view( get=extend_schema( summary="Verify email address", description="Verify user's email address using verification token.", responses={ 200: {"type": "object", "properties": {"message": {"type": "string"}}}, 400: "Bad Request", 404: "Token not found", }, tags=["Authentication"], ), ) class EmailVerificationAPIView(APIView): """API endpoint for email verification.""" permission_classes = [AllowAny] authentication_classes = [] def get(self, request: Request, token: str) -> Response: from apps.accounts.models import EmailVerification try: verification = EmailVerification.objects.select_related("user").get(token=token) user = verification.user # Activate the user user.is_active = True user.save() # Delete the verification record verification.delete() return Response({"detail": "Email verified successfully. You can now log in.", "success": True}) except EmailVerification.DoesNotExist: return Response({"detail": "Invalid or expired verification token"}, status=status.HTTP_404_NOT_FOUND) @extend_schema_view( post=extend_schema( summary="Resend verification email", description="Resend email verification to user's email address.", request={"type": "object", "properties": {"email": {"type": "string", "format": "email"}}}, responses={ 200: {"type": "object", "properties": {"message": {"type": "string"}}}, 400: "Bad Request", 404: "User not found", }, tags=["Authentication"], ), ) class ResendVerificationAPIView(APIView): """API endpoint to resend email verification.""" permission_classes = [AllowAny] authentication_classes = [] def post(self, request: Request) -> Response: from django.contrib.sites.shortcuts import get_current_site from django.utils.crypto import get_random_string from django_forwardemail.services import EmailService from apps.accounts.models import EmailVerification email = request.data.get("email") if not email: return Response({"detail": "Email address is required"}, status=status.HTTP_400_BAD_REQUEST) try: user = UserModel.objects.get(email__iexact=email.strip().lower()) # Don't resend if user is already active if user.is_active: return Response({"detail": "Email is already verified"}, status=status.HTTP_400_BAD_REQUEST) # Create or update verification record verification, created = EmailVerification.objects.get_or_create( user=user, defaults={"token": get_random_string(64)} ) if not created: # Update existing token and timestamp verification.token = get_random_string(64) verification.save() # Send verification email site = get_current_site(_get_underlying_request(request)) verification_url = request.build_absolute_uri(f"/api/v1/auth/verify-email/{verification.token}/") try: EmailService.send_email( to=user.email, subject="Verify your ThrillWiki account", text=f""" Welcome to ThrillWiki! Please verify your email address by clicking the link below: {verification_url} If you didn't create an account, you can safely ignore this email. Thanks, The ThrillWiki Team """.strip(), site=site, ) return Response({"detail": "Verification email sent successfully", "success": True}) except Exception as e: capture_and_log(e, 'Send verification email', source='api') return Response( {"detail": "Failed to send verification email"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) except UserModel.DoesNotExist: # Don't reveal whether email exists return Response({"detail": "If the email exists, a verification email has been sent", "success": True}) # Note: User Profile, Top List, and Top List Item ViewSets are now handled # by the dedicated accounts app at backend/apps/api/v1/accounts/views.py # to avoid duplication and maintain clean separation of concerns. @extend_schema_view( post=extend_schema( summary="Process OAuth profile", description="Process OAuth profile data during social authentication flow.", request={ "type": "object", "properties": { "provider": {"type": "string", "description": "OAuth provider (e.g., google, discord)"}, "profile": { "type": "object", "description": "Profile data from OAuth provider", "properties": { "id": {"type": "string"}, "email": {"type": "string", "format": "email"}, "name": {"type": "string"}, "avatar_url": {"type": "string", "format": "uri"}, }, }, "access_token": {"type": "string", "description": "OAuth access token"}, }, "required": ["provider", "profile"], }, responses={ 200: { "type": "object", "properties": { "success": {"type": "boolean"}, "action": {"type": "string", "enum": ["created", "updated", "linked"]}, "user": {"type": "object"}, "profile_synced": {"type": "boolean"}, }, }, 400: "Bad Request", 401: "Unauthorized", 403: "Account suspended", }, tags=["Social Authentication"], ), ) class ProcessOAuthProfileAPIView(APIView): """ API endpoint to process OAuth profile data. This endpoint is called AFTER the OAuth flow is complete to: 1. Check if user is banned (SECURITY CRITICAL) 2. Extract avatar from OAuth provider 3. Download and upload avatar to Cloudflare Images 4. Sync display name from OAuth provider 5. Update username if it's a generic UUID-based username Called with an empty body - uses the authenticated session. Full parity with Supabase Edge Function: process-oauth-profile BULLETPROOFED: Comprehensive validation, sanitization, and error handling. """ permission_classes = [IsAuthenticated] # Security constants MAX_AVATAR_SIZE = 10 * 1024 * 1024 # 10MB AVATAR_DOWNLOAD_TIMEOUT = 10.0 # seconds AVATAR_UPLOAD_TIMEOUT = 30.0 # seconds MAX_USERNAME_LENGTH = 150 MIN_USERNAME_LENGTH = 3 ALLOWED_USERNAME_CHARS = set("abcdefghijklmnopqrstuvwxyz0123456789_") # Rate limiting for avatar uploads (prevent abuse) AVATAR_UPLOAD_COOLDOWN = 60 # seconds between uploads def post(self, request: Request) -> Response: import re import httpx from django.db import transaction from django.core.cache import cache try: user = request.user # ================================================================ # STEP 0: Validate user object exists and is valid # ================================================================ if not user or not hasattr(user, 'user_id'): logger.error("ProcessOAuthProfile called with invalid user object") return Response({ "success": False, "error": "Invalid user session", }, status=status.HTTP_401_UNAUTHORIZED) user_id_str = str(user.user_id) # ================================================================ # STEP 1: CRITICAL - Check ban status FIRST # ================================================================ is_banned = getattr(user, 'is_banned', False) # Also check via profile if applicable if not is_banned: try: from apps.accounts.models import UserProfile profile_check = UserProfile.objects.filter(user=user).first() if profile_check and getattr(profile_check, 'is_banned', False): is_banned = True except Exception: pass if is_banned: ban_reason = getattr(user, 'ban_reason', None) or "Policy violation" # Sanitize ban reason for response safe_ban_reason = str(ban_reason)[:200] if ban_reason else None logger.warning( f"Banned user attempted OAuth profile update", extra={"user_id": user_id_str, "ban_reason": safe_ban_reason} ) return Response({ "error": "Account suspended", "message": ( f"Your account has been suspended. Reason: {safe_ban_reason}" if safe_ban_reason else "Your account has been suspended. Contact support for assistance." ), "ban_reason": safe_ban_reason, }, status=status.HTTP_403_FORBIDDEN) # ================================================================ # STEP 2: Check rate limiting for avatar uploads # ================================================================ rate_limit_key = f"oauth_profile:avatar:{user_id_str}" if cache.get(rate_limit_key): return Response({ "success": True, "action": "rate_limited", "message": "Please wait before updating your profile again", "avatar_uploaded": False, "profile_updated": False, }) # ================================================================ # STEP 3: Get OAuth provider info from social accounts # ================================================================ try: from allauth.socialaccount.models import SocialAccount except ImportError: logger.error("django-allauth not installed") return Response({ "success": False, "error": "Social authentication not configured", }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) social_accounts = SocialAccount.objects.filter(user=user) if not social_accounts.exists(): return Response({ "success": True, "action": "skipped", "message": "No OAuth accounts linked", }) # Get the most recent social account social_account = social_accounts.order_by("-date_joined").first() if not social_account: return Response({ "success": True, "action": "skipped", "message": "No valid OAuth account found", }) provider = social_account.provider or "unknown" extra_data = social_account.extra_data or {} # Validate extra_data is a dict if not isinstance(extra_data, dict): logger.warning(f"Invalid extra_data type for user {user_id_str}: {type(extra_data)}") extra_data = {} # ================================================================ # STEP 4: Extract profile data based on provider (with sanitization) # ================================================================ avatar_url = None display_name = None username_base = None if provider == "google": avatar_url = self._sanitize_url(extra_data.get("picture")) display_name = self._sanitize_display_name(extra_data.get("name")) email = extra_data.get("email", "") if email and isinstance(email, str): username_base = self._sanitize_username(email.split("@")[0]) elif provider == "discord": discord_data = extra_data discord_id = discord_data.get("id") or discord_data.get("sub") display_name = self._sanitize_display_name( discord_data.get("global_name") or discord_data.get("full_name") or discord_data.get("name") ) # Discord avatar URL construction with validation avatar_hash = discord_data.get("avatar") if discord_id and avatar_hash and isinstance(discord_id, str) and isinstance(avatar_hash, str): # Validate discord_id is numeric if discord_id.isdigit(): # Validate avatar_hash is alphanumeric if re.match(r'^[a-zA-Z0-9_]+$', avatar_hash): avatar_url = f"https://cdn.discordapp.com/avatars/{discord_id}/{avatar_hash}.png?size=256" if not avatar_url: avatar_url = self._sanitize_url( discord_data.get("avatar_url") or discord_data.get("picture") ) raw_username = discord_data.get("username") or discord_data.get("name", "") if raw_username and isinstance(raw_username, str): username_base = self._sanitize_username(raw_username.split("#")[0]) if not username_base and discord_id: username_base = f"discord_{str(discord_id)[:8]}" else: # Generic provider handling avatar_url = self._sanitize_url( extra_data.get("picture") or extra_data.get("avatar_url") or extra_data.get("avatar") ) display_name = self._sanitize_display_name( extra_data.get("name") or extra_data.get("display_name") ) # ================================================================ # STEP 5: Get or create user profile (with transaction) # ================================================================ from apps.accounts.models import UserProfile with transaction.atomic(): profile, profile_created = UserProfile.objects.select_for_update().get_or_create( user=user ) # Check if profile already has an avatar if profile.avatar_id: return Response({ "success": True, "action": "skipped", "message": "Avatar already exists", "avatar_uploaded": False, "profile_updated": False, }) # ================================================================ # STEP 6: Download and upload avatar to Cloudflare (outside transaction) # ================================================================ avatar_uploaded = False if avatar_url: try: # Validate URL scheme if not avatar_url.startswith(('https://', 'http://')): logger.warning(f"Invalid avatar URL scheme: {avatar_url[:50]}") else: # Download avatar from provider download_response = httpx.get( avatar_url, timeout=self.AVATAR_DOWNLOAD_TIMEOUT, follow_redirects=True, headers={ "User-Agent": "ThrillWiki/1.0", "Accept": "image/*", }, ) if download_response.status_code == 200: image_data = download_response.content content_type = download_response.headers.get("content-type", "") # Validate content type if not content_type.startswith("image/"): logger.warning(f"Invalid content type for avatar: {content_type}") # Validate file size elif len(image_data) > self.MAX_AVATAR_SIZE: logger.warning( f"Avatar too large for user {user_id_str}: {len(image_data)} bytes" ) # Validate minimum size (avoid empty images) elif len(image_data) < 100: logger.warning(f"Avatar too small for user {user_id_str}") else: avatar_uploaded = self._upload_to_cloudflare( image_data, user_id_str, provider, profile ) else: logger.warning( f"Avatar download failed: {download_response.status_code}", extra={"user_id": user_id_str, "provider": provider} ) except httpx.TimeoutException: logger.warning(f"Avatar download timeout for user {user_id_str}") except httpx.HTTPError as download_error: logger.warning(f"Failed to download avatar: {download_error}") except Exception as e: logger.warning(f"Unexpected avatar error: {e}") # Set rate limit after successful processing if avatar_uploaded: cache.set(rate_limit_key, True, self.AVATAR_UPLOAD_COOLDOWN) # ================================================================ # STEP 7: Update display name if not set (with validation) # ================================================================ profile_updated = False if display_name and not getattr(user, "display_name", None): try: user.display_name = display_name user.save(update_fields=["display_name"]) profile_updated = True except Exception as e: logger.warning(f"Failed to update display name: {e}") # ================================================================ # STEP 8: Update username if it's a generic UUID-based username # ================================================================ current_username = getattr(user, "username", "") or "" if username_base and current_username.startswith("user_"): try: new_username = self._ensure_unique_username(username_base, user.user_id) if new_username and new_username != current_username: user.username = new_username user.save(update_fields=["username"]) profile_updated = True logger.info( f"Username updated from {current_username} to {new_username}", extra={"user_id": user_id_str} ) except Exception as e: logger.warning(f"Failed to update username: {e}") return Response({ "success": True, "action": "processed", "provider": provider, "avatar_uploaded": avatar_uploaded, "profile_updated": profile_updated, "message": "OAuth profile processed successfully", }) except Exception as e: capture_and_log(e, "Process OAuth profile", source="api", request=request) return Response({ "success": False, "error": "Failed to process OAuth profile", }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def _sanitize_url(self, url) -> str | None: """Sanitize and validate URL.""" if not url or not isinstance(url, str): return None url = url.strip()[:2000] # Limit length # Basic URL validation if not url.startswith(('https://', 'http://')): return None # Block obviously malicious patterns dangerous_patterns = ['javascript:', 'data:', 'file:', ' str | None: """Sanitize display name.""" if not name or not isinstance(name, str): return None import re # Strip and limit length name = name.strip()[:100] # Remove control characters name = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', name) # Remove excessive whitespace name = ' '.join(name.split()) # Must have at least 1 character if len(name) < 1: return None return name def _sanitize_username(self, username) -> str | None: """Sanitize username for use.""" if not username or not isinstance(username, str): return None import re # Lowercase and remove non-allowed characters username = username.lower().strip() username = re.sub(r'[^a-z0-9_]', '', username) # Enforce length limits if len(username) < self.MIN_USERNAME_LENGTH: return None username = username[:self.MAX_USERNAME_LENGTH] return username def _upload_to_cloudflare(self, image_data: bytes, user_id: str, provider: str, profile) -> bool: """Upload image to Cloudflare Images with error handling.""" import httpx from django.db import transaction try: from django_cloudflareimages_toolkit.models import CloudflareImage from django_cloudflareimages_toolkit.services import CloudflareImagesService cf_service = CloudflareImagesService() # Request direct upload URL upload_result = cf_service.get_direct_upload_url( metadata={ "type": "avatar", "user_id": user_id, "provider": provider, } ) if not upload_result or "upload_url" not in upload_result: logger.warning("Failed to get Cloudflare upload URL") return False upload_url = upload_result["upload_url"] cloudflare_id = upload_result.get("id") or upload_result.get("cloudflare_id") if not cloudflare_id: logger.warning("No Cloudflare ID in upload result") return False # Upload image to Cloudflare files = {"file": ("avatar.png", image_data, "image/png")} upload_response = httpx.post( upload_url, files=files, timeout=self.AVATAR_UPLOAD_TIMEOUT, ) if upload_response.status_code not in [200, 201]: logger.warning(f"Cloudflare upload failed: {upload_response.status_code}") return False # Create CloudflareImage record and link to profile with transaction.atomic(): cf_image = CloudflareImage.objects.create( cloudflare_id=cloudflare_id, is_uploaded=True, metadata={ "type": "avatar", "user_id": user_id, "provider": provider, } ) profile.avatar = cf_image profile.save(update_fields=["avatar"]) logger.info( f"Avatar uploaded successfully", extra={"user_id": user_id, "provider": provider, "cloudflare_id": cloudflare_id} ) return True except ImportError: logger.warning("django-cloudflareimages-toolkit not available") return False except Exception as cf_error: logger.warning(f"Cloudflare upload error: {cf_error}") return False def _ensure_unique_username(self, base_username: str, user_id: str, max_attempts: int = 10) -> str | None: """ Ensure username is unique by appending numbers if needed. Returns None if no valid username can be generated. """ if not base_username: return None username = base_username.lower()[:self.MAX_USERNAME_LENGTH] # Validate characters if not all(c in self.ALLOWED_USERNAME_CHARS for c in username): return None attempt = 0 while attempt < max_attempts: try: existing = UserModel.objects.filter(username=username).exclude(user_id=user_id).exists() if not existing: return username except Exception: break attempt += 1 # Ensure we don't exceed max length with suffix suffix = f"_{attempt}" max_base = self.MAX_USERNAME_LENGTH - len(suffix) username = f"{base_username.lower()[:max_base]}{suffix}" # Fallback to UUID-based username return f"user_{str(user_id)[:8]}"