Files
thrillwiki_django_no_react/backend/apps/api/v1/auth/views.py
pacnpal 04394b9976 Refactor user account system and remove moderation integration
- Remove first_name and last_name fields from User model
- Add user deletion and social provider services
- Restructure auth serializers into separate directory
- Update avatar upload functionality and API endpoints
- Remove django-moderation integration documentation
- Add mandatory compliance enforcement rules
- Update frontend documentation with API usage examples
2025-08-30 07:31:58 -04:00

738 lines
25 KiB
Python

"""
Auth domain views for ThrillWiki API v1.
This module contains all authentication-related API endpoints including
login, signup, logout, password management, social authentication,
user profiles, and top lists.
"""
from .serializers.social import (
ConnectedProviderSerializer,
AvailableProviderSerializer,
SocialAuthStatusSerializer,
ConnectProviderInputSerializer,
ConnectProviderOutputSerializer,
DisconnectProviderOutputSerializer,
SocialProviderErrorSerializer,
)
from apps.accounts.services.social_provider_service import SocialProviderService
from django.contrib.auth import authenticate, login, logout, get_user_model
from django.contrib.sites.shortcuts import get_current_site
from django.core.exceptions import ValidationError
from django.db.models import Q
from typing import Optional, cast # added 'cast'
from django.http import HttpRequest # new import
from rest_framework import status
from rest_framework.views import APIView
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.permissions import AllowAny, IsAuthenticated
from drf_spectacular.utils import extend_schema, extend_schema_view
# Import from the main serializers.py file (not the serializers package)
from ..serializers import (
# Authentication serializers
LoginInputSerializer,
LoginOutputSerializer,
SignupInputSerializer,
SignupOutputSerializer,
LogoutOutputSerializer,
UserOutputSerializer,
PasswordResetInputSerializer,
PasswordResetOutputSerializer,
PasswordChangeInputSerializer,
PasswordChangeOutputSerializer,
SocialProviderOutputSerializer,
AuthStatusOutputSerializer,
)
# Handle optional dependencies with fallback classes
class FallbackTurnstileMixin:
"""Fallback mixin if TurnstileMixin is not available."""
def validate_turnstile(self, request):
pass
# Try to import the real class, use fallback if not available and ensure it's a class/type
try:
from apps.accounts.mixins import TurnstileMixin as _ImportedTurnstileMixin
# Ensure the imported object is a class/type that can be used as a base class.
# If it's not a type for any reason, fall back to the safe mixin.
if isinstance(_ImportedTurnstileMixin, type):
TurnstileMixin = _ImportedTurnstileMixin
else:
TurnstileMixin = FallbackTurnstileMixin
except Exception:
# Catch any import errors or unexpected exceptions and use the fallback mixin.
TurnstileMixin = FallbackTurnstileMixin
UserModel = get_user_model()
# Helper: safely obtain underlying HttpRequest (used by Django auth)
def _get_underlying_request(request: Request) -> HttpRequest:
"""
Return a django HttpRequest for use with Django auth and site utilities.
DRF's Request wraps the underlying HttpRequest in ._request; cast() tells the
typechecker that the returned object is indeed an HttpRequest.
"""
return cast(HttpRequest, getattr(request, "_request", request))
# Helper: encapsulate user lookup + authenticate to reduce complexity in view
def _authenticate_user_by_lookup(
email_or_username: str, password: str, request: Request
) -> Optional[UserModel]:
"""
Try a single optimized query to find a user by email OR username then authenticate.
Returns authenticated user or None.
"""
try:
# Single query to find user by email OR username
if "@" in (email_or_username or ""):
user_obj = (
UserModel.objects.select_related()
.filter(Q(email=email_or_username) | Q(username=email_or_username))
.first()
)
else:
user_obj = (
UserModel.objects.select_related()
.filter(Q(username=email_or_username) | Q(email=email_or_username))
.first()
)
if user_obj:
username_val = getattr(user_obj, "username", None)
return authenticate(
# type: ignore[arg-type]
_get_underlying_request(request),
username=username_val,
password=password,
)
except Exception:
# Fallback to authenticate directly with provided identifier
return authenticate(
# type: ignore[arg-type]
_get_underlying_request(request),
username=email_or_username,
password=password,
)
return None
# === AUTHENTICATION API VIEWS ===
@extend_schema_view(
post=extend_schema(
summary="User login",
description="Authenticate user with username/email and password.",
request=LoginInputSerializer,
responses={
200: LoginOutputSerializer,
400: "Bad Request",
},
tags=["Authentication"],
),
)
class LoginAPIView(APIView):
"""API endpoint for user login."""
permission_classes = [AllowAny]
authentication_classes = []
serializer_class = LoginInputSerializer
def post(self, request: Request) -> Response:
try:
# instantiate mixin before calling to avoid type-mismatch in static analysis
TurnstileMixin().validate_turnstile(request)
except ValidationError as e:
return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST)
except Exception:
# If mixin doesn't do anything, continue
pass
serializer = LoginInputSerializer(data=request.data)
if serializer.is_valid():
validated = serializer.validated_data
# Use .get to satisfy static analyzers
email_or_username = validated.get("username") # type: ignore[assignment]
password = validated.get("password") # type: ignore[assignment]
if not email_or_username or not password:
return Response(
{"error": "username and password are required"},
status=status.HTTP_400_BAD_REQUEST,
)
user = _authenticate_user_by_lookup(email_or_username, password, request)
if user:
if getattr(user, "is_active", False):
# pass a real HttpRequest to Django login
login(_get_underlying_request(request), user)
# Generate JWT tokens
from rest_framework_simplejwt.tokens import RefreshToken
refresh = RefreshToken.for_user(user)
access_token = refresh.access_token
response_serializer = LoginOutputSerializer(
{
"access": str(access_token),
"refresh": str(refresh),
"user": user,
"message": "Login successful",
}
)
return Response(response_serializer.data)
else:
return Response(
{"error": "Account is disabled"},
status=status.HTTP_400_BAD_REQUEST,
)
else:
return Response(
{"error": "Invalid credentials"},
status=status.HTTP_400_BAD_REQUEST,
)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@extend_schema_view(
post=extend_schema(
summary="User registration",
description="Register a new user account.",
request=SignupInputSerializer,
responses={
201: SignupOutputSerializer,
400: "Bad Request",
},
tags=["Authentication"],
),
)
class SignupAPIView(APIView):
"""API endpoint for user registration."""
permission_classes = [AllowAny]
authentication_classes = []
serializer_class = SignupInputSerializer
def post(self, request: Request) -> Response:
try:
# instantiate mixin before calling to avoid type-mismatch in static analysis
TurnstileMixin().validate_turnstile(request)
except ValidationError as e:
return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST)
except Exception:
# If mixin doesn't do anything, continue
pass
serializer = SignupInputSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
# pass a real HttpRequest to Django login
login(_get_underlying_request(request), user) # type: ignore[arg-type]
# Generate JWT tokens
from rest_framework_simplejwt.tokens import RefreshToken
refresh = RefreshToken.for_user(user)
access_token = refresh.access_token
response_serializer = SignupOutputSerializer(
{
"access": str(access_token),
"refresh": str(refresh),
"user": user,
"message": "Registration successful",
}
)
return Response(response_serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@extend_schema_view(
post=extend_schema(
summary="User logout",
description="Logout the current user and blacklist their refresh token.",
responses={
200: LogoutOutputSerializer,
401: "Unauthorized",
},
tags=["Authentication"],
),
)
class LogoutAPIView(APIView):
"""API endpoint for user logout."""
permission_classes = [IsAuthenticated]
serializer_class = LogoutOutputSerializer
def post(self, request: Request) -> Response:
try:
# Get refresh token from request data with proper type handling
refresh_token = None
if hasattr(request, 'data') and request.data is not None:
data = getattr(request, 'data', {})
if hasattr(data, 'get'):
refresh_token = data.get("refresh")
if refresh_token and isinstance(refresh_token, str):
# Blacklist the refresh token
from rest_framework_simplejwt.tokens import RefreshToken
try:
# Create RefreshToken from string and blacklist it
refresh_token_obj = RefreshToken(
refresh_token) # type: ignore[arg-type]
refresh_token_obj.blacklist()
except Exception:
# Token might be invalid or already blacklisted
pass
# Also delete the old token for backward compatibility
if hasattr(request.user, "auth_token"):
request.user.auth_token.delete()
# Logout from session using the underlying HttpRequest
logout(_get_underlying_request(request))
response_serializer = LogoutOutputSerializer(
{"message": "Logout successful"}
)
return Response(response_serializer.data)
except Exception:
return Response(
{"error": "Logout failed"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@extend_schema_view(
get=extend_schema(
summary="Get current user",
description="Retrieve information about the currently authenticated user.",
responses={
200: UserOutputSerializer,
401: "Unauthorized",
},
tags=["Authentication"],
),
)
class CurrentUserAPIView(APIView):
"""API endpoint to get current user information."""
permission_classes = [IsAuthenticated]
serializer_class = UserOutputSerializer
def get(self, request: Request) -> Response:
serializer = UserOutputSerializer(request.user)
return Response(serializer.data)
@extend_schema_view(
post=extend_schema(
summary="Request password reset",
description="Send a password reset email to the user.",
request=PasswordResetInputSerializer,
responses={
200: PasswordResetOutputSerializer,
400: "Bad Request",
},
tags=["Authentication"],
),
)
class PasswordResetAPIView(APIView):
"""API endpoint to request password reset."""
permission_classes = [AllowAny]
serializer_class = PasswordResetInputSerializer
def post(self, request: Request) -> Response:
serializer = PasswordResetInputSerializer(
data=request.data, context={"request": request}
)
if serializer.is_valid():
serializer.save()
response_serializer = PasswordResetOutputSerializer(
{"detail": "Password reset email sent"}
)
return Response(response_serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@extend_schema_view(
post=extend_schema(
summary="Change password",
description="Change the current user's password.",
request=PasswordChangeInputSerializer,
responses={
200: PasswordChangeOutputSerializer,
400: "Bad Request",
401: "Unauthorized",
},
tags=["Authentication"],
),
)
class PasswordChangeAPIView(APIView):
"""API endpoint to change password."""
permission_classes = [IsAuthenticated]
serializer_class = PasswordChangeInputSerializer
def post(self, request: Request) -> Response:
serializer = PasswordChangeInputSerializer(
data=request.data, context={"request": request}
)
if serializer.is_valid():
serializer.save()
response_serializer = PasswordChangeOutputSerializer(
{"detail": "Password changed successfully"}
)
return Response(response_serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@extend_schema_view(
get=extend_schema(
summary="Get social providers",
description="Retrieve available social authentication providers.",
responses={200: "List of social providers"},
tags=["Authentication"],
),
)
class SocialProvidersAPIView(APIView):
"""API endpoint to get available social authentication providers."""
permission_classes = [AllowAny]
serializer_class = SocialProviderOutputSerializer
def get(self, request: Request) -> Response:
from django.core.cache import cache
# get_current_site expects a django HttpRequest; _get_underlying_request now returns HttpRequest
site = get_current_site(_get_underlying_request(request))
# Cache key based on site and request host - use getattr to avoid attribute errors
site_id = getattr(site, "id", getattr(site, "pk", None))
cache_key = f"social_providers:{site_id}:{request.get_host()}"
# Try to get from cache first (cache for 15 minutes)
cached_providers = cache.get(cache_key)
if cached_providers is not None:
return Response(cached_providers)
providers_list = []
# Optimized query: filter by site and order by provider name
from allauth.socialaccount.models import SocialApp
social_apps = SocialApp.objects.filter(sites=site).order_by("provider")
for social_app in social_apps:
try:
provider_name = (
social_app.name or getattr(social_app, "provider", "").title()
)
auth_url = request.build_absolute_uri(
f"/accounts/{social_app.provider}/login/"
)
providers_list.append(
{
"id": social_app.provider,
"name": provider_name,
"authUrl": auth_url,
}
)
except Exception:
continue
serializer = SocialProviderOutputSerializer(providers_list, many=True)
response_data = serializer.data
cache.set(cache_key, response_data, 900)
return Response(response_data)
@extend_schema_view(
post=extend_schema(
summary="Check authentication status",
description="Check if user is authenticated and return user data.",
responses={200: AuthStatusOutputSerializer},
tags=["Authentication"],
),
)
class AuthStatusAPIView(APIView):
"""API endpoint to check authentication status."""
permission_classes = [AllowAny]
serializer_class = AuthStatusOutputSerializer
def post(self, request: Request) -> Response:
if request.user.is_authenticated:
response_data = {
"authenticated": True,
"user": request.user,
}
else:
response_data = {
"authenticated": False,
"user": None,
}
serializer = AuthStatusOutputSerializer(response_data)
return Response(serializer.data)
# === SOCIAL PROVIDER MANAGEMENT API VIEWS ===
@extend_schema_view(
get=extend_schema(
summary="Get available social providers",
description="Retrieve list of available social authentication providers.",
responses={
200: AvailableProviderSerializer(many=True),
},
tags=["Social Authentication"],
),
)
class AvailableProvidersAPIView(APIView):
"""API endpoint to get available social providers."""
permission_classes = [AllowAny]
serializer_class = AvailableProviderSerializer
def get(self, request: Request) -> Response:
providers = [
{
"provider": "google",
"name": "Google",
"login_url": "/auth/social/google/",
"connect_url": "/auth/social/connect/google/",
},
{
"provider": "discord",
"name": "Discord",
"login_url": "/auth/social/discord/",
"connect_url": "/auth/social/connect/discord/",
}
]
serializer = AvailableProviderSerializer(providers, many=True)
return Response(serializer.data)
@extend_schema_view(
get=extend_schema(
summary="Get connected social providers",
description="Retrieve list of social providers connected to the user's account.",
responses={
200: ConnectedProviderSerializer(many=True),
401: "Unauthorized",
},
tags=["Social Authentication"],
),
)
class ConnectedProvidersAPIView(APIView):
"""API endpoint to get user's connected social providers."""
permission_classes = [IsAuthenticated]
serializer_class = ConnectedProviderSerializer
def get(self, request: Request) -> Response:
service = SocialProviderService()
providers = service.get_connected_providers(request.user)
serializer = ConnectedProviderSerializer(providers, many=True)
return Response(serializer.data)
@extend_schema_view(
post=extend_schema(
summary="Connect social provider",
description="Connect a social authentication provider to the user's account.",
request=ConnectProviderInputSerializer,
responses={
200: ConnectProviderOutputSerializer,
400: SocialProviderErrorSerializer,
401: "Unauthorized",
},
tags=["Social Authentication"],
),
)
class ConnectProviderAPIView(APIView):
"""API endpoint to connect a social provider."""
permission_classes = [IsAuthenticated]
serializer_class = ConnectProviderInputSerializer
def post(self, request: Request, provider: str) -> Response:
# Validate provider
if provider not in ['google', 'discord']:
return Response(
{
"success": False,
"error": "INVALID_PROVIDER",
"message": f"Provider '{provider}' is not supported",
"suggestions": ["Use 'google' or 'discord'"]
},
status=status.HTTP_400_BAD_REQUEST
)
serializer = ConnectProviderInputSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{
"success": False,
"error": "VALIDATION_ERROR",
"message": "Invalid request data",
"details": serializer.errors,
"suggestions": ["Provide a valid access_token"]
},
status=status.HTTP_400_BAD_REQUEST
)
access_token = serializer.validated_data['access_token']
try:
service = SocialProviderService()
result = service.connect_provider(request.user, provider, access_token)
response_serializer = ConnectProviderOutputSerializer(result)
return Response(response_serializer.data)
except Exception as e:
return Response(
{
"success": False,
"error": "CONNECTION_FAILED",
"message": str(e),
"suggestions": [
"Verify the access token is valid",
"Ensure the provider account is not already connected to another user"
]
},
status=status.HTTP_400_BAD_REQUEST
)
@extend_schema_view(
post=extend_schema(
summary="Disconnect social provider",
description="Disconnect a social authentication provider from the user's account.",
responses={
200: DisconnectProviderOutputSerializer,
400: SocialProviderErrorSerializer,
401: "Unauthorized",
},
tags=["Social Authentication"],
),
)
class DisconnectProviderAPIView(APIView):
"""API endpoint to disconnect a social provider."""
permission_classes = [IsAuthenticated]
serializer_class = DisconnectProviderOutputSerializer
def post(self, request: Request, provider: str) -> Response:
# Validate provider
if provider not in ['google', 'discord']:
return Response(
{
"success": False,
"error": "INVALID_PROVIDER",
"message": f"Provider '{provider}' is not supported",
"suggestions": ["Use 'google' or 'discord'"]
},
status=status.HTTP_400_BAD_REQUEST
)
try:
service = SocialProviderService()
# Check if disconnection is safe
can_disconnect, reason = service.can_disconnect_provider(
request.user, provider)
if not can_disconnect:
return Response(
{
"success": False,
"error": "UNSAFE_DISCONNECTION",
"message": reason,
"suggestions": [
"Set up email/password authentication before disconnecting",
"Connect another social provider before disconnecting this one"
]
},
status=status.HTTP_400_BAD_REQUEST
)
# Perform disconnection
result = service.disconnect_provider(request.user, provider)
response_serializer = DisconnectProviderOutputSerializer(result)
return Response(response_serializer.data)
except Exception as e:
return Response(
{
"success": False,
"error": "DISCONNECTION_FAILED",
"message": str(e),
"suggestions": [
"Verify the provider is currently connected",
"Ensure you have alternative authentication methods"
]
},
status=status.HTTP_400_BAD_REQUEST
)
@extend_schema_view(
get=extend_schema(
summary="Get social authentication status",
description="Get comprehensive social authentication status for the user.",
responses={
200: SocialAuthStatusSerializer,
401: "Unauthorized",
},
tags=["Social Authentication"],
),
)
class SocialAuthStatusAPIView(APIView):
"""API endpoint to get social authentication status."""
permission_classes = [IsAuthenticated]
serializer_class = SocialAuthStatusSerializer
def get(self, request: Request) -> Response:
service = SocialProviderService()
auth_status = service.get_auth_status(request.user)
serializer = SocialAuthStatusSerializer(auth_status)
return Response(serializer.data)
# Note: User Profile, Top List, and Top List Item ViewSets are now handled
# by the dedicated accounts app at backend/apps/api/v1/accounts/views.py
# to avoid duplication and maintain clean separation of concerns.