Files
thrillwiki_django_no_react/backend/apps/api/v1/auth/views.py
pac7 821c94bc76 Remove unused social authentication endpoints and update frontend
Remove deprecated social provider API endpoints from the backend and update the frontend JavaScript to hardcode Google and Discord as the available social login providers.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 9bc9dd7a-5328-4cb7-91de-b3cb33a0c48c
Replit-Commit-Checkpoint-Type: intermediate_checkpoint
2025-09-22 00:15:15 +00:00

590 lines
20 KiB
Python

"""
Auth domain views for ThrillWiki API v1.
This module contains all authentication-related API endpoints including
login, signup, logout, password management, social authentication,
user profiles, and top lists.
"""
from django.contrib.auth import authenticate, login, logout, get_user_model
from django.contrib.sites.shortcuts import get_current_site
from django.core.exceptions import ValidationError
from django.db.models import Q
from typing import Optional, cast # added 'cast'
from django.http import HttpRequest # new import
from rest_framework import status
from rest_framework.views import APIView
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.permissions import AllowAny, IsAuthenticated
from drf_spectacular.utils import extend_schema, extend_schema_view
# Import directly from the auth serializers.py file (not the serializers package)
from .serializers import (
# Authentication serializers
LoginInputSerializer,
LoginOutputSerializer,
SignupInputSerializer,
SignupOutputSerializer,
LogoutOutputSerializer,
UserOutputSerializer,
PasswordResetInputSerializer,
PasswordResetOutputSerializer,
PasswordChangeInputSerializer,
PasswordChangeOutputSerializer,
AuthStatusOutputSerializer,
)
# Handle optional dependencies with fallback classes
class FallbackTurnstileMixin:
"""Fallback mixin if TurnstileMixin is not available."""
def validate_turnstile(self, request):
pass
# Try to import the real class, use fallback if not available and ensure it's a class/type
try:
from apps.accounts.mixins import TurnstileMixin as _ImportedTurnstileMixin
# Ensure the imported object is a class/type that can be used as a base class.
# If it's not a type for any reason, fall back to the safe mixin.
if isinstance(_ImportedTurnstileMixin, type):
TurnstileMixin = _ImportedTurnstileMixin
else:
TurnstileMixin = FallbackTurnstileMixin
except Exception:
# Catch any import errors or unexpected exceptions and use the fallback mixin.
TurnstileMixin = FallbackTurnstileMixin
UserModel = get_user_model()
# Helper: safely obtain underlying HttpRequest (used by Django auth)
def _get_underlying_request(request: Request) -> HttpRequest:
"""
Return a django HttpRequest for use with Django auth and site utilities.
DRF's Request wraps the underlying HttpRequest in ._request; cast() tells the
typechecker that the returned object is indeed an HttpRequest.
"""
return cast(HttpRequest, getattr(request, "_request", request))
# Helper: encapsulate user lookup + authenticate to reduce complexity in view
def _authenticate_user_by_lookup(
email_or_username: str, password: str, request: Request
) -> Optional[UserModel]:
"""
Try a single optimized query to find a user by email OR username then authenticate.
Returns authenticated user or None.
"""
try:
# Single query to find user by email OR username
if "@" in (email_or_username or ""):
user_obj = (
UserModel.objects.select_related()
.filter(Q(email=email_or_username) | Q(username=email_or_username))
.first()
)
else:
user_obj = (
UserModel.objects.select_related()
.filter(Q(username=email_or_username) | Q(email=email_or_username))
.first()
)
if user_obj:
username_val = getattr(user_obj, "username", None)
return authenticate(
# type: ignore[arg-type]
_get_underlying_request(request),
username=username_val,
password=password,
)
except Exception:
# Fallback to authenticate directly with provided identifier
return authenticate(
# type: ignore[arg-type]
_get_underlying_request(request),
username=email_or_username,
password=password,
)
return None
# === AUTHENTICATION API VIEWS ===
@extend_schema_view(
post=extend_schema(
summary="User login",
description="Authenticate user with username/email and password.",
request=LoginInputSerializer,
responses={
200: LoginOutputSerializer,
400: "Bad Request",
},
tags=["Authentication"],
),
)
class LoginAPIView(APIView):
"""API endpoint for user login."""
permission_classes = [AllowAny]
authentication_classes = []
serializer_class = LoginInputSerializer
def post(self, request: Request) -> Response:
try:
# instantiate mixin before calling to avoid type-mismatch in static analysis
TurnstileMixin().validate_turnstile(request)
except ValidationError as e:
return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST)
except Exception:
# If mixin doesn't do anything, continue
pass
serializer = LoginInputSerializer(data=request.data)
if serializer.is_valid():
validated = serializer.validated_data
# Use .get to satisfy static analyzers
email_or_username = validated.get("username") # type: ignore[assignment]
password = validated.get("password") # type: ignore[assignment]
if not email_or_username or not password:
return Response(
{"error": "username and password are required"},
status=status.HTTP_400_BAD_REQUEST,
)
user = _authenticate_user_by_lookup(email_or_username, password, request)
if user:
if getattr(user, "is_active", False):
# pass a real HttpRequest to Django login with backend specified
login(_get_underlying_request(request), user,
backend='django.contrib.auth.backends.ModelBackend')
# Generate JWT tokens
from rest_framework_simplejwt.tokens import RefreshToken
refresh = RefreshToken.for_user(user)
access_token = refresh.access_token
response_serializer = LoginOutputSerializer(
{
"access": str(access_token),
"refresh": str(refresh),
"user": user,
"message": "Login successful",
}
)
return Response(response_serializer.data)
else:
return Response(
{
"error": "Email verification required",
"message": "Please verify your email address before logging in. Check your email for a verification link.",
"email_verification_required": True
},
status=status.HTTP_400_BAD_REQUEST,
)
else:
return Response(
{"error": "Invalid credentials"},
status=status.HTTP_400_BAD_REQUEST,
)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@extend_schema_view(
post=extend_schema(
summary="User registration",
description="Register a new user account. Email verification required.",
request=SignupInputSerializer,
responses={
201: SignupOutputSerializer,
400: "Bad Request",
},
tags=["Authentication"],
),
)
class SignupAPIView(APIView):
"""API endpoint for user registration."""
permission_classes = [AllowAny]
authentication_classes = []
serializer_class = SignupInputSerializer
def post(self, request: Request) -> Response:
try:
# instantiate mixin before calling to avoid type-mismatch in static analysis
TurnstileMixin().validate_turnstile(request)
except ValidationError as e:
return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST)
except Exception:
# If mixin doesn't do anything, continue
pass
serializer = SignupInputSerializer(data=request.data, context={"request": request})
if serializer.is_valid():
user = serializer.save()
# Don't log in the user immediately - they need to verify their email first
response_serializer = SignupOutputSerializer(
{
"access": None,
"refresh": None,
"user": user,
"message": "Registration successful. Please check your email to verify your account.",
"email_verification_required": True,
}
)
return Response(response_serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@extend_schema_view(
post=extend_schema(
summary="User logout",
description="Logout the current user and blacklist their refresh token.",
responses={
200: LogoutOutputSerializer,
401: "Unauthorized",
},
tags=["Authentication"],
),
)
class LogoutAPIView(APIView):
"""API endpoint for user logout."""
permission_classes = [IsAuthenticated]
serializer_class = LogoutOutputSerializer
def post(self, request: Request) -> Response:
try:
# Get refresh token from request data with proper type handling
refresh_token = None
if hasattr(request, 'data') and request.data is not None:
data = getattr(request, 'data', {})
if hasattr(data, 'get'):
refresh_token = data.get("refresh")
if refresh_token and isinstance(refresh_token, str):
# Blacklist the refresh token
from rest_framework_simplejwt.tokens import RefreshToken
try:
# Create RefreshToken from string and blacklist it
refresh_token_obj = RefreshToken(
refresh_token) # type: ignore[arg-type]
refresh_token_obj.blacklist()
except Exception:
# Token might be invalid or already blacklisted
pass
# Also delete the old token for backward compatibility
if hasattr(request.user, "auth_token"):
request.user.auth_token.delete()
# Logout from session using the underlying HttpRequest
logout(_get_underlying_request(request))
response_serializer = LogoutOutputSerializer(
{"message": "Logout successful"}
)
return Response(response_serializer.data)
except Exception:
return Response(
{"error": "Logout failed"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@extend_schema_view(
get=extend_schema(
summary="Get current user",
description="Retrieve information about the currently authenticated user.",
responses={
200: UserOutputSerializer,
401: "Unauthorized",
},
tags=["Authentication"],
),
)
class CurrentUserAPIView(APIView):
"""API endpoint to get current user information."""
permission_classes = [IsAuthenticated]
serializer_class = UserOutputSerializer
def get(self, request: Request) -> Response:
serializer = UserOutputSerializer(request.user)
return Response(serializer.data)
@extend_schema_view(
post=extend_schema(
summary="Request password reset",
description="Send a password reset email to the user.",
request=PasswordResetInputSerializer,
responses={
200: PasswordResetOutputSerializer,
400: "Bad Request",
},
tags=["Authentication"],
),
)
class PasswordResetAPIView(APIView):
"""API endpoint to request password reset."""
permission_classes = [AllowAny]
serializer_class = PasswordResetInputSerializer
def post(self, request: Request) -> Response:
serializer = PasswordResetInputSerializer(
data=request.data, context={"request": request}
)
if serializer.is_valid():
serializer.save()
response_serializer = PasswordResetOutputSerializer(
{"detail": "Password reset email sent"}
)
return Response(response_serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@extend_schema_view(
post=extend_schema(
summary="Change password",
description="Change the current user's password.",
request=PasswordChangeInputSerializer,
responses={
200: PasswordChangeOutputSerializer,
400: "Bad Request",
401: "Unauthorized",
},
tags=["Authentication"],
),
)
class PasswordChangeAPIView(APIView):
"""API endpoint to change password."""
permission_classes = [IsAuthenticated]
serializer_class = PasswordChangeInputSerializer
def post(self, request: Request) -> Response:
serializer = PasswordChangeInputSerializer(
data=request.data, context={"request": request}
)
if serializer.is_valid():
serializer.save()
response_serializer = PasswordChangeOutputSerializer(
{"detail": "Password changed successfully"}
)
return Response(response_serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@extend_schema_view(
post=extend_schema(
summary="Check authentication status",
description="Check if user is authenticated and return user data.",
responses={200: AuthStatusOutputSerializer},
tags=["Authentication"],
),
)
class AuthStatusAPIView(APIView):
"""API endpoint to check authentication status."""
permission_classes = [AllowAny]
serializer_class = AuthStatusOutputSerializer
def post(self, request: Request) -> Response:
if request.user.is_authenticated:
response_data = {
"authenticated": True,
"user": request.user,
}
else:
response_data = {
"authenticated": False,
"user": None,
}
serializer = AuthStatusOutputSerializer(response_data)
return Response(serializer.data)
# === EMAIL VERIFICATION API VIEWS ===
@extend_schema_view(
get=extend_schema(
summary="Verify email address",
description="Verify user's email address using verification token.",
responses={
200: {"type": "object", "properties": {"message": {"type": "string"}}},
400: "Bad Request",
404: "Token not found",
},
tags=["Authentication"],
),
)
class EmailVerificationAPIView(APIView):
"""API endpoint for email verification."""
permission_classes = [AllowAny]
authentication_classes = []
def get(self, request: Request, token: str) -> Response:
from apps.accounts.models import EmailVerification
try:
verification = EmailVerification.objects.select_related('user').get(token=token)
user = verification.user
# Activate the user
user.is_active = True
user.save()
# Delete the verification record
verification.delete()
return Response({
"message": "Email verified successfully. You can now log in.",
"success": True
})
except EmailVerification.DoesNotExist:
return Response(
{"error": "Invalid or expired verification token"},
status=status.HTTP_404_NOT_FOUND
)
@extend_schema_view(
post=extend_schema(
summary="Resend verification email",
description="Resend email verification to user's email address.",
request={"type": "object", "properties": {"email": {"type": "string", "format": "email"}}},
responses={
200: {"type": "object", "properties": {"message": {"type": "string"}}},
400: "Bad Request",
404: "User not found",
},
tags=["Authentication"],
),
)
class ResendVerificationAPIView(APIView):
"""API endpoint to resend email verification."""
permission_classes = [AllowAny]
authentication_classes = []
def post(self, request: Request) -> Response:
from apps.accounts.models import EmailVerification
from django.utils.crypto import get_random_string
from django_forwardemail.services import EmailService
from django.contrib.sites.shortcuts import get_current_site
email = request.data.get('email')
if not email:
return Response(
{"error": "Email address is required"},
status=status.HTTP_400_BAD_REQUEST
)
try:
user = UserModel.objects.get(email__iexact=email.strip().lower())
# Don't resend if user is already active
if user.is_active:
return Response(
{"error": "Email is already verified"},
status=status.HTTP_400_BAD_REQUEST
)
# Create or update verification record
verification, created = EmailVerification.objects.get_or_create(
user=user,
defaults={'token': get_random_string(64)}
)
if not created:
# Update existing token and timestamp
verification.token = get_random_string(64)
verification.save()
# Send verification email
site = get_current_site(_get_underlying_request(request))
verification_url = request.build_absolute_uri(
f"/api/v1/auth/verify-email/{verification.token}/"
)
try:
EmailService.send_email(
to=user.email,
subject="Verify your ThrillWiki account",
text=f"""
Welcome to ThrillWiki!
Please verify your email address by clicking the link below:
{verification_url}
If you didn't create an account, you can safely ignore this email.
Thanks,
The ThrillWiki Team
""".strip(),
site=site,
)
return Response({
"message": "Verification email sent successfully",
"success": True
})
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Failed to send verification email to {user.email}: {e}")
return Response(
{"error": "Failed to send verification email"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
except UserModel.DoesNotExist:
# Don't reveal whether email exists
return Response({
"message": "If the email exists, a verification email has been sent",
"success": True
})
# Note: User Profile, Top List, and Top List Item ViewSets are now handled
# by the dedicated accounts app at backend/apps/api/v1/accounts/views.py
# to avoid duplication and maintain clean separation of concerns.