""" Auth domain views for ThrillWiki API v1. This module contains all authentication-related API endpoints including login, signup, logout, password management, social authentication, user profiles, and top lists. """ import time from django.contrib.auth import authenticate, login, logout, get_user_model from django.contrib.sites.shortcuts import get_current_site from django.core.exceptions import ValidationError from django.utils import timezone from django.conf import settings from django.db.models import Q from rest_framework import status from rest_framework.views import APIView from rest_framework.viewsets import ModelViewSet from rest_framework.request import Request from rest_framework.response import Response from rest_framework.permissions import AllowAny, IsAuthenticated from rest_framework.decorators import action from allauth.socialaccount import providers from drf_spectacular.utils import extend_schema, extend_schema_view from apps.accounts.models import UserProfile, TopList, TopListItem from .serializers import ( # Authentication serializers LoginInputSerializer, LoginOutputSerializer, SignupInputSerializer, SignupOutputSerializer, LogoutOutputSerializer, UserOutputSerializer, PasswordResetInputSerializer, PasswordResetOutputSerializer, PasswordChangeInputSerializer, PasswordChangeOutputSerializer, SocialProviderOutputSerializer, AuthStatusOutputSerializer, # User profile serializers UserProfileCreateInputSerializer, UserProfileUpdateInputSerializer, UserProfileOutputSerializer, # Top list serializers TopListCreateInputSerializer, TopListUpdateInputSerializer, TopListOutputSerializer, TopListItemCreateInputSerializer, TopListItemUpdateInputSerializer, TopListItemOutputSerializer, ) # Handle optional dependencies with fallback classes class FallbackTurnstileMixin: """Fallback mixin if TurnstileMixin is not available.""" def validate_turnstile(self, request): pass # Try to import the real class, use fallback if not available try: from apps.accounts.mixins import TurnstileMixin except ImportError: TurnstileMixin = FallbackTurnstileMixin UserModel = get_user_model() # === AUTHENTICATION API VIEWS === @extend_schema_view( post=extend_schema( summary="User login", description="Authenticate user with username/email and password.", request=LoginInputSerializer, responses={ 200: LoginOutputSerializer, 400: "Bad Request", }, tags=["Authentication"], ), ) class LoginAPIView(TurnstileMixin, APIView): """API endpoint for user login.""" permission_classes = [AllowAny] authentication_classes = [] serializer_class = LoginInputSerializer def post(self, request: Request) -> Response: try: # Validate Turnstile if configured self.validate_turnstile(request) except ValidationError as e: return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) serializer = LoginInputSerializer(data=request.data) if serializer.is_valid(): # type: ignore[index] email_or_username = serializer.validated_data["username"] password = serializer.validated_data["password"] # type: ignore[index] # Optimized user lookup: single query using Q objects user = None # Single query to find user by email OR username try: if "@" in email_or_username: # Email-like input: try email first, then username as fallback user_obj = ( UserModel.objects.select_related() .filter( Q(email=email_or_username) | Q(username=email_or_username) ) .first() ) else: # Username-like input: try username first, then email as fallback user_obj = ( UserModel.objects.select_related() .filter( Q(username=email_or_username) | Q(email=email_or_username) ) .first() ) if user_obj: user = authenticate( # type: ignore[attr-defined] request._request, username=user_obj.username, password=password, ) except Exception: # Fallback to original behavior user = authenticate( # type: ignore[attr-defined] request._request, username=email_or_username, password=password, ) if user: if user.is_active: login(request._request, user) # type: ignore[attr-defined] # Optimized token creation - get_or_create is atomic from rest_framework.authtoken.models import Token token, created = Token.objects.get_or_create(user=user) response_serializer = LoginOutputSerializer( { "token": token.key, "user": user, "message": "Login successful", } ) return Response(response_serializer.data) else: return Response( {"error": "Account is disabled"}, status=status.HTTP_400_BAD_REQUEST, ) else: return Response( {"error": "Invalid credentials"}, status=status.HTTP_400_BAD_REQUEST, ) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @extend_schema_view( post=extend_schema( summary="User registration", description="Register a new user account.", request=SignupInputSerializer, responses={ 201: SignupOutputSerializer, 400: "Bad Request", }, tags=["Authentication"], ), ) class SignupAPIView(TurnstileMixin, APIView): """API endpoint for user registration.""" permission_classes = [AllowAny] authentication_classes = [] serializer_class = SignupInputSerializer def post(self, request: Request) -> Response: try: # Validate Turnstile if configured self.validate_turnstile(request) except ValidationError as e: return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) serializer = SignupInputSerializer(data=request.data) if serializer.is_valid(): user = serializer.save() login(request._request, user) # type: ignore[attr-defined] from rest_framework.authtoken.models import Token token, created = Token.objects.get_or_create(user=user) response_serializer = SignupOutputSerializer( { "token": token.key, "user": user, "message": "Registration successful", } ) return Response(response_serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @extend_schema_view( post=extend_schema( summary="User logout", description="Logout the current user and invalidate their token.", responses={ 200: LogoutOutputSerializer, 401: "Unauthorized", }, tags=["Authentication"], ), ) class LogoutAPIView(APIView): """API endpoint for user logout.""" permission_classes = [IsAuthenticated] serializer_class = LogoutOutputSerializer def post(self, request: Request) -> Response: try: # Delete the token for token-based auth if hasattr(request.user, "auth_token"): request.user.auth_token.delete() # Logout from session logout(request._request) # type: ignore[attr-defined] response_serializer = LogoutOutputSerializer( {"message": "Logout successful"} ) return Response(response_serializer.data) except Exception as e: return Response( {"error": "Logout failed"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @extend_schema_view( get=extend_schema( summary="Get current user", description="Retrieve information about the currently authenticated user.", responses={ 200: UserOutputSerializer, 401: "Unauthorized", }, tags=["Authentication"], ), ) class CurrentUserAPIView(APIView): """API endpoint to get current user information.""" permission_classes = [IsAuthenticated] serializer_class = UserOutputSerializer def get(self, request: Request) -> Response: serializer = UserOutputSerializer(request.user) return Response(serializer.data) @extend_schema_view( post=extend_schema( summary="Request password reset", description="Send a password reset email to the user.", request=PasswordResetInputSerializer, responses={ 200: PasswordResetOutputSerializer, 400: "Bad Request", }, tags=["Authentication"], ), ) class PasswordResetAPIView(APIView): """API endpoint to request password reset.""" permission_classes = [AllowAny] serializer_class = PasswordResetInputSerializer def post(self, request: Request) -> Response: serializer = PasswordResetInputSerializer( data=request.data, context={"request": request} ) if serializer.is_valid(): serializer.save() response_serializer = PasswordResetOutputSerializer( {"detail": "Password reset email sent"} ) return Response(response_serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @extend_schema_view( post=extend_schema( summary="Change password", description="Change the current user's password.", request=PasswordChangeInputSerializer, responses={ 200: PasswordChangeOutputSerializer, 400: "Bad Request", 401: "Unauthorized", }, tags=["Authentication"], ), ) class PasswordChangeAPIView(APIView): """API endpoint to change password.""" permission_classes = [IsAuthenticated] serializer_class = PasswordChangeInputSerializer def post(self, request: Request) -> Response: serializer = PasswordChangeInputSerializer( data=request.data, context={"request": request} ) if serializer.is_valid(): serializer.save() response_serializer = PasswordChangeOutputSerializer( {"detail": "Password changed successfully"} ) return Response(response_serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @extend_schema_view( get=extend_schema( summary="Get social providers", description="Retrieve available social authentication providers.", responses={200: "List of social providers"}, tags=["Authentication"], ), ) class SocialProvidersAPIView(APIView): """API endpoint to get available social authentication providers.""" permission_classes = [AllowAny] serializer_class = SocialProviderOutputSerializer def get(self, request: Request) -> Response: from django.core.cache import cache from django.contrib.sites.shortcuts import get_current_site site = get_current_site(request._request) # type: ignore[attr-defined] # Cache key based on site and request host cache_key = ( f"social_providers:{getattr(site, 'id', site.pk)}:{request.get_host()}" ) # Try to get from cache first (cache for 15 minutes) cached_providers = cache.get(cache_key) if cached_providers is not None: return Response(cached_providers) providers_list = [] # Optimized query: filter by site and order by provider name from allauth.socialaccount.models import SocialApp social_apps = SocialApp.objects.filter(sites=site).order_by("provider") for social_app in social_apps: try: # Simplified provider name resolution - avoid expensive provider class loading provider_name = social_app.name or social_app.provider.title() # Build auth URL efficiently auth_url = request.build_absolute_uri( f"/accounts/{social_app.provider}/login/" ) providers_list.append( { "id": social_app.provider, "name": provider_name, "authUrl": auth_url, } ) except Exception: # Skip if provider can't be loaded continue # Serialize and cache the result serializer = SocialProviderOutputSerializer(providers_list, many=True) response_data = serializer.data # Cache for 15 minutes (900 seconds) cache.set(cache_key, response_data, 900) return Response(response_data) @extend_schema_view( post=extend_schema( summary="Check authentication status", description="Check if user is authenticated and return user data.", responses={200: AuthStatusOutputSerializer}, tags=["Authentication"], ), ) class AuthStatusAPIView(APIView): """API endpoint to check authentication status.""" permission_classes = [AllowAny] serializer_class = AuthStatusOutputSerializer def post(self, request: Request) -> Response: if request.user.is_authenticated: response_data = { "authenticated": True, "user": request.user, } else: response_data = { "authenticated": False, "user": None, } serializer = AuthStatusOutputSerializer(response_data) return Response(serializer.data) # === USER PROFILE API VIEWS === class UserProfileViewSet(ModelViewSet): """ViewSet for managing user profiles.""" queryset = UserProfile.objects.select_related("user").all() permission_classes = [IsAuthenticated] def get_serializer_class(self): """Return appropriate serializer based on action.""" if self.action == "create": return UserProfileCreateInputSerializer elif self.action in ["update", "partial_update"]: return UserProfileUpdateInputSerializer return UserProfileOutputSerializer def get_queryset(self): """Filter profiles based on user permissions.""" if self.request.user.is_staff: return self.queryset return self.queryset.filter(user=self.request.user) @action(detail=False, methods=["get"]) def me(self, request): """Get current user's profile.""" try: profile = UserProfile.objects.get(user=request.user) serializer = self.get_serializer(profile) return Response(serializer.data) except UserProfile.DoesNotExist: return Response( {"error": "Profile not found"}, status=status.HTTP_404_NOT_FOUND ) # === TOP LIST API VIEWS === class TopListViewSet(ModelViewSet): """ViewSet for managing user top lists.""" queryset = ( TopList.objects.select_related("user").prefetch_related("items__ride").all() ) permission_classes = [IsAuthenticated] def get_serializer_class(self): """Return appropriate serializer based on action.""" if self.action == "create": return TopListCreateInputSerializer elif self.action in ["update", "partial_update"]: return TopListUpdateInputSerializer return TopListOutputSerializer def get_queryset(self): """Filter lists based on user permissions and visibility.""" queryset = self.queryset if not self.request.user.is_staff: # Non-staff users can only see their own lists and public lists queryset = queryset.filter(Q(user=self.request.user) | Q(is_public=True)) return queryset.order_by("-created_at") def perform_create(self, serializer): """Set the user when creating a top list.""" serializer.save(user=self.request.user) @action(detail=False, methods=["get"]) def my_lists(self, request): """Get current user's top lists.""" lists = self.get_queryset().filter(user=request.user) serializer = self.get_serializer(lists, many=True) return Response(serializer.data) @action(detail=True, methods=["post"]) def duplicate(self, request, pk=None): """Duplicate a top list for the current user.""" original_list = self.get_object() # Create new list new_list = TopList.objects.create( user=request.user, name=f"Copy of {original_list.name}", description=original_list.description, is_public=False, # Duplicated lists are private by default ) # Copy all items for item in original_list.items.all(): TopListItem.objects.create( top_list=new_list, ride=item.ride, position=item.position, notes=item.notes, ) serializer = self.get_serializer(new_list) return Response(serializer.data, status=status.HTTP_201_CREATED) class TopListItemViewSet(ModelViewSet): """ViewSet for managing top list items.""" queryset = TopListItem.objects.select_related("top_list__user", "ride").all() permission_classes = [IsAuthenticated] def get_serializer_class(self): """Return appropriate serializer based on action.""" if self.action == "create": return TopListItemCreateInputSerializer elif self.action in ["update", "partial_update"]: return TopListItemUpdateInputSerializer return TopListItemOutputSerializer def get_queryset(self): """Filter items based on user permissions.""" queryset = self.queryset if not self.request.user.is_staff: # Non-staff users can only see items from their own lists or public lists queryset = queryset.filter( Q(top_list__user=self.request.user) | Q(top_list__is_public=True) ) return queryset.order_by("top_list_id", "position") def perform_create(self, serializer): """Validate user can add items to the list.""" top_list = serializer.validated_data["top_list"] if top_list.user != self.request.user and not self.request.user.is_staff: raise PermissionError("You can only add items to your own lists") serializer.save() def perform_update(self, serializer): """Validate user can update items in the list.""" top_list = serializer.instance.top_list if top_list.user != self.request.user and not self.request.user.is_staff: raise PermissionError("You can only update items in your own lists") serializer.save() def perform_destroy(self, instance): """Validate user can delete items from the list.""" if ( instance.top_list.user != self.request.user and not self.request.user.is_staff ): raise PermissionError("You can only delete items from your own lists") instance.delete() @action(detail=False, methods=["post"]) def reorder(self, request): """Reorder items in a top list.""" top_list_id = request.data.get("top_list_id") item_ids = request.data.get("item_ids", []) if not top_list_id or not item_ids: return Response( {"error": "top_list_id and item_ids are required"}, status=status.HTTP_400_BAD_REQUEST, ) try: top_list = TopList.objects.get(id=top_list_id) if top_list.user != request.user and not request.user.is_staff: return Response( {"error": "Permission denied"}, status=status.HTTP_403_FORBIDDEN ) # Update positions for position, item_id in enumerate(item_ids, 1): TopListItem.objects.filter(id=item_id, top_list=top_list).update( position=position ) return Response({"success": True}) except TopList.DoesNotExist: return Response( {"error": "Top list not found"}, status=status.HTTP_404_NOT_FOUND )