""" Authentication API views for ThrillWiki API v1. This module contains all authentication-related API endpoints including login, signup, logout, password management, and social authentication. """ # type: ignore[misc,attr-defined,arg-type,call-arg,index,assignment] from typing import TYPE_CHECKING, Type, Any from django.contrib.auth import login, logout, get_user_model from django.contrib.sites.shortcuts import get_current_site from django.core.exceptions import ValidationError from rest_framework import status from rest_framework.views import APIView from rest_framework.request import Request from rest_framework.response import Response from rest_framework.permissions import AllowAny, IsAuthenticated from drf_spectacular.utils import extend_schema, extend_schema_view # Import serializers from the auth serializers module from ..serializers.auth import ( LoginInputSerializer, LoginOutputSerializer, SignupInputSerializer, SignupOutputSerializer, LogoutOutputSerializer, UserOutputSerializer, PasswordResetInputSerializer, PasswordResetOutputSerializer, PasswordChangeInputSerializer, PasswordChangeOutputSerializer, SocialProviderOutputSerializer, AuthStatusOutputSerializer, ) # Handle optional dependencies with fallback classes class FallbackTurnstileMixin: """Fallback mixin if TurnstileMixin is not available.""" def validate_turnstile(self, request: Any) -> None: """Fallback validation method that does nothing.""" pass # Try to import the real class, use fallback if not available try: from apps.accounts.mixins import TurnstileMixin except ImportError: TurnstileMixin = FallbackTurnstileMixin # Type hint for the mixin if TYPE_CHECKING: from typing import Union TurnstileMixinType = Union[Type[FallbackTurnstileMixin], Any] else: TurnstileMixinType = TurnstileMixin UserModel = get_user_model() @extend_schema_view( post=extend_schema( summary="User login", description="Authenticate user with username/email and password.", request=LoginInputSerializer, responses={ 200: LoginOutputSerializer, 400: "Bad Request", }, tags=["Authentication"], ), ) class LoginAPIView(TurnstileMixin, APIView): # type: ignore[misc] """API endpoint for user login.""" permission_classes = [AllowAny] authentication_classes = [] serializer_class = LoginInputSerializer def post(self, request: Request) -> Response: try: # Validate Turnstile if configured self.validate_turnstile(request) except ValidationError as e: return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) serializer = LoginInputSerializer( data=request.data, context={"request": request} ) if serializer.is_valid(): # The serializer handles authentication validation user = serializer.validated_data["user"] # type: ignore[index] login(request._request, user) # type: ignore[attr-defined] # Optimized token creation - get_or_create is atomic from rest_framework.authtoken.models import Token token, created = Token.objects.get_or_create(user=user) response_serializer = LoginOutputSerializer( { "token": token.key, "user": user, "message": "Login successful", } ) return Response(response_serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @extend_schema_view( post=extend_schema( summary="User registration", description="Register a new user account.", request=SignupInputSerializer, responses={ 201: SignupOutputSerializer, 400: "Bad Request", }, tags=["Authentication"], ), ) class SignupAPIView(TurnstileMixin, APIView): # type: ignore[misc] """API endpoint for user registration.""" permission_classes = [AllowAny] authentication_classes = [] serializer_class = SignupInputSerializer def post(self, request: Request) -> Response: try: # Validate Turnstile if configured self.validate_turnstile(request) except ValidationError as e: return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) serializer = SignupInputSerializer(data=request.data) if serializer.is_valid(): user = serializer.save() login(request._request, user) # type: ignore[attr-defined] from rest_framework.authtoken.models import Token token, created = Token.objects.get_or_create(user=user) response_serializer = SignupOutputSerializer( { "token": token.key, "user": user, "message": "Registration successful", } ) return Response(response_serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @extend_schema_view( post=extend_schema( summary="User logout", description="Logout the current user and invalidate their token.", responses={ 200: LogoutOutputSerializer, 401: "Unauthorized", }, tags=["Authentication"], ), ) class LogoutAPIView(APIView): """API endpoint for user logout.""" permission_classes = [IsAuthenticated] serializer_class = LogoutOutputSerializer def post(self, request: Request) -> Response: try: # Delete the token for token-based auth if hasattr(request.user, "auth_token"): request.user.auth_token.delete() # Logout from session logout(request._request) # type: ignore[attr-defined] response_serializer = LogoutOutputSerializer( {"message": "Logout successful"} ) return Response(response_serializer.data) except Exception: return Response( {"error": "Logout failed"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @extend_schema_view( get=extend_schema( summary="Get current user", description="Retrieve information about the currently authenticated user.", responses={ 200: UserOutputSerializer, 401: "Unauthorized", }, tags=["Authentication"], ), ) class CurrentUserAPIView(APIView): """API endpoint to get current user information.""" permission_classes = [IsAuthenticated] serializer_class = UserOutputSerializer def get(self, request: Request) -> Response: serializer = UserOutputSerializer(request.user) return Response(serializer.data) @extend_schema_view( post=extend_schema( summary="Request password reset", description="Send a password reset email to the user.", request=PasswordResetInputSerializer, responses={ 200: PasswordResetOutputSerializer, 400: "Bad Request", }, tags=["Authentication"], ), ) class PasswordResetAPIView(APIView): """API endpoint to request password reset.""" permission_classes = [AllowAny] serializer_class = PasswordResetInputSerializer def post(self, request: Request) -> Response: serializer = PasswordResetInputSerializer( data=request.data, context={"request": request} ) if serializer.is_valid(): serializer.save() response_serializer = PasswordResetOutputSerializer( {"detail": "Password reset email sent"} ) return Response(response_serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @extend_schema_view( post=extend_schema( summary="Change password", description="Change the current user's password.", request=PasswordChangeInputSerializer, responses={ 200: PasswordChangeOutputSerializer, 400: "Bad Request", 401: "Unauthorized", }, tags=["Authentication"], ), ) class PasswordChangeAPIView(APIView): """API endpoint to change password.""" permission_classes = [IsAuthenticated] serializer_class = PasswordChangeInputSerializer def post(self, request: Request) -> Response: serializer = PasswordChangeInputSerializer( data=request.data, context={"request": request} ) if serializer.is_valid(): serializer.save() response_serializer = PasswordChangeOutputSerializer( {"detail": "Password changed successfully"} ) return Response(response_serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @extend_schema_view( get=extend_schema( summary="Get social providers", description="Retrieve available social authentication providers.", responses={200: "List of social providers"}, tags=["Authentication"], ), ) class SocialProvidersAPIView(APIView): """API endpoint to get available social authentication providers.""" permission_classes = [AllowAny] serializer_class = SocialProviderOutputSerializer def get(self, request: Request) -> Response: from django.core.cache import cache try: # Check if django-allauth is available try: from allauth.socialaccount.models import SocialApp except ImportError: # django-allauth is not installed, return empty list serializer = SocialProviderOutputSerializer([], many=True) return Response(serializer.data) site = get_current_site(request._request) # type: ignore[attr-defined] # Cache key based on site and request host # Use pk for Site objects, domain for RequestSite objects site_identifier = getattr(site, "pk", site.domain) cache_key = f"social_providers:{site_identifier}:{request.get_host()}" # Try to get from cache first (cache for 15 minutes) cached_providers = cache.get(cache_key) if cached_providers is not None: return Response(cached_providers) providers_list = [] # Optimized query: filter by site and order by provider name try: social_apps = SocialApp.objects.filter(sites=site).order_by("provider") except Exception: # If query fails (table doesn't exist, etc.), return empty list social_apps = [] for social_app in social_apps: try: # Simplified provider name resolution - avoid expensive provider class loading provider_name = social_app.name or social_app.provider.title() # Build auth URL efficiently auth_url = request.build_absolute_uri( f"/accounts/{social_app.provider}/login/" ) providers_list.append( { "id": social_app.provider, "name": provider_name, "authUrl": auth_url, } ) except Exception: # Skip if provider can't be loaded continue # Serialize and cache the result serializer = SocialProviderOutputSerializer(providers_list, many=True) response_data = serializer.data # Cache for 15 minutes (900 seconds) cache.set(cache_key, response_data, 900) return Response(response_data) except Exception as e: # Return a proper JSON error response instead of letting it bubble up return Response( { "status": "error", "error": { "code": "SOCIAL_PROVIDERS_ERROR", "message": "Unable to retrieve social providers", "details": str(e) if str(e) else None, "request_user": ( str(request.user) if hasattr(request, "user") else "AnonymousUser" ), }, "data": None, }, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) @extend_schema_view( post=extend_schema( summary="Check authentication status", description="Check if user is authenticated and return user data.", responses={200: AuthStatusOutputSerializer}, tags=["Authentication"], ), ) class AuthStatusAPIView(APIView): """API endpoint to check authentication status.""" permission_classes = [AllowAny] serializer_class = AuthStatusOutputSerializer def post(self, request: Request) -> Response: if request.user.is_authenticated: response_data = { "authenticated": True, "user": request.user, } else: response_data = { "authenticated": False, "user": None, } serializer = AuthStatusOutputSerializer(response_data) return Response(serializer.data)