mirror of
https://github.com/pacnpal/thrilltrack-explorer.git
synced 2025-12-20 14:51:12 -05:00
- Created a base email template (base.html) for consistent styling across all emails. - Added moderation approval email template (moderation_approved.html) to notify users of approved submissions. - Added moderation rejection email template (moderation_rejected.html) to inform users of required changes for their submissions. - Created password reset email template (password_reset.html) for users requesting to reset their passwords. - Developed a welcome email template (welcome.html) to greet new users and provide account details and tips for using ThrillWiki.
597 lines
18 KiB
Python
597 lines
18 KiB
Python
"""
|
|
Authentication API endpoints.
|
|
|
|
Provides endpoints for:
|
|
- User registration and login
|
|
- JWT token management
|
|
- MFA/2FA
|
|
- Password management
|
|
- User profile and preferences
|
|
- User administration
|
|
"""
|
|
|
|
from typing import List, Optional
|
|
from django.http import HttpRequest
|
|
from django.core.exceptions import ValidationError, PermissionDenied
|
|
from django.db.models import Q
|
|
from ninja import Router
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
from rest_framework_simplejwt.exceptions import TokenError
|
|
import logging
|
|
|
|
from apps.users.models import User, UserRole, UserProfile
|
|
from apps.users.services import (
|
|
AuthenticationService,
|
|
MFAService,
|
|
RoleService,
|
|
UserManagementService
|
|
)
|
|
from apps.users.permissions import (
|
|
jwt_auth,
|
|
require_auth,
|
|
require_admin,
|
|
get_permission_checker
|
|
)
|
|
from api.v1.schemas import (
|
|
UserRegisterRequest,
|
|
UserLoginRequest,
|
|
TokenResponse,
|
|
TokenRefreshRequest,
|
|
UserProfileOut,
|
|
UserProfileUpdate,
|
|
ChangePasswordRequest,
|
|
ResetPasswordRequest,
|
|
TOTPEnableResponse,
|
|
TOTPConfirmRequest,
|
|
TOTPVerifyRequest,
|
|
UserRoleOut,
|
|
UserPermissionsOut,
|
|
UserStatsOut,
|
|
UserProfilePreferencesOut,
|
|
UserProfilePreferencesUpdate,
|
|
BanUserRequest,
|
|
UnbanUserRequest,
|
|
AssignRoleRequest,
|
|
UserListOut,
|
|
MessageSchema,
|
|
ErrorSchema,
|
|
)
|
|
|
|
router = Router(tags=["Authentication"])
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================================
|
|
# Public Authentication Endpoints
|
|
# ============================================================================
|
|
|
|
@router.post("/register", response={201: UserProfileOut, 400: ErrorSchema})
|
|
def register(request: HttpRequest, data: UserRegisterRequest):
|
|
"""
|
|
Register a new user account.
|
|
|
|
- **email**: User's email address (required)
|
|
- **password**: Password (min 8 characters, required)
|
|
- **password_confirm**: Password confirmation (required)
|
|
- **username**: Username (optional, auto-generated if not provided)
|
|
- **first_name**: First name (optional)
|
|
- **last_name**: Last name (optional)
|
|
|
|
Returns the created user profile and automatically logs in the user.
|
|
"""
|
|
try:
|
|
# Register user
|
|
user = AuthenticationService.register_user(
|
|
email=data.email,
|
|
password=data.password,
|
|
username=data.username,
|
|
first_name=data.first_name or '',
|
|
last_name=data.last_name or ''
|
|
)
|
|
|
|
logger.info(f"New user registered: {user.email}")
|
|
return 201, user
|
|
|
|
except ValidationError as e:
|
|
error_msg = str(e.message_dict) if hasattr(e, 'message_dict') else str(e)
|
|
return 400, {"error": "Registration failed", "detail": error_msg}
|
|
except Exception as e:
|
|
logger.error(f"Registration error: {e}")
|
|
return 400, {"error": "Registration failed", "detail": str(e)}
|
|
|
|
|
|
@router.post("/login", response={200: TokenResponse, 401: ErrorSchema})
|
|
def login(request: HttpRequest, data: UserLoginRequest):
|
|
"""
|
|
Login with email and password.
|
|
|
|
- **email**: User's email address
|
|
- **password**: Password
|
|
- **mfa_token**: MFA token (required if MFA is enabled)
|
|
|
|
Returns JWT access and refresh tokens on successful authentication.
|
|
"""
|
|
try:
|
|
# Authenticate user
|
|
user = AuthenticationService.authenticate_user(data.email, data.password)
|
|
|
|
if not user:
|
|
return 401, {"error": "Invalid credentials", "detail": "Email or password is incorrect"}
|
|
|
|
# Check MFA if enabled
|
|
if user.mfa_enabled:
|
|
if not data.mfa_token:
|
|
return 401, {"error": "MFA required", "detail": "Please provide MFA token"}
|
|
|
|
if not MFAService.verify_totp(user, data.mfa_token):
|
|
return 401, {"error": "Invalid MFA token", "detail": "The MFA token is invalid"}
|
|
|
|
# Generate tokens
|
|
refresh = RefreshToken.for_user(user)
|
|
|
|
return 200, {
|
|
"access": str(refresh.access_token),
|
|
"refresh": str(refresh),
|
|
"token_type": "Bearer"
|
|
}
|
|
|
|
except ValidationError as e:
|
|
return 401, {"error": "Authentication failed", "detail": str(e)}
|
|
except Exception as e:
|
|
logger.error(f"Login error: {e}")
|
|
return 401, {"error": "Authentication failed", "detail": str(e)}
|
|
|
|
|
|
@router.post("/token/refresh", response={200: TokenResponse, 401: ErrorSchema})
|
|
def refresh_token(request: HttpRequest, data: TokenRefreshRequest):
|
|
"""
|
|
Refresh JWT access token using refresh token.
|
|
|
|
- **refresh**: Refresh token
|
|
|
|
Returns new access token and optionally a new refresh token.
|
|
"""
|
|
try:
|
|
refresh = RefreshToken(data.refresh)
|
|
|
|
return 200, {
|
|
"access": str(refresh.access_token),
|
|
"refresh": str(refresh),
|
|
"token_type": "Bearer"
|
|
}
|
|
|
|
except TokenError as e:
|
|
return 401, {"error": "Invalid token", "detail": str(e)}
|
|
except Exception as e:
|
|
logger.error(f"Token refresh error: {e}")
|
|
return 401, {"error": "Token refresh failed", "detail": str(e)}
|
|
|
|
|
|
@router.post("/logout", auth=jwt_auth, response={200: MessageSchema})
|
|
@require_auth
|
|
def logout(request: HttpRequest):
|
|
"""
|
|
Logout (blacklist refresh token).
|
|
|
|
Note: Requires authentication. The client should also discard the access token.
|
|
"""
|
|
# Note: Token blacklisting is handled by djangorestframework-simplejwt
|
|
# when BLACKLIST_AFTER_ROTATION is True in settings
|
|
return 200, {"message": "Logged out successfully", "success": True}
|
|
|
|
|
|
# ============================================================================
|
|
# User Profile Endpoints
|
|
# ============================================================================
|
|
|
|
@router.get("/me", auth=jwt_auth, response={200: UserProfileOut, 401: ErrorSchema})
|
|
@require_auth
|
|
def get_my_profile(request: HttpRequest):
|
|
"""
|
|
Get current user's profile.
|
|
|
|
Returns detailed profile information for the authenticated user.
|
|
"""
|
|
user = request.auth
|
|
return 200, user
|
|
|
|
|
|
@router.patch("/me", auth=jwt_auth, response={200: UserProfileOut, 400: ErrorSchema})
|
|
@require_auth
|
|
def update_my_profile(request: HttpRequest, data: UserProfileUpdate):
|
|
"""
|
|
Update current user's profile.
|
|
|
|
- **first_name**: First name (optional)
|
|
- **last_name**: Last name (optional)
|
|
- **username**: Username (optional)
|
|
- **bio**: User biography (optional, max 500 characters)
|
|
- **avatar_url**: Avatar image URL (optional)
|
|
"""
|
|
try:
|
|
user = request.auth
|
|
|
|
# Prepare update data
|
|
update_data = data.dict(exclude_unset=True)
|
|
|
|
# Update profile
|
|
updated_user = UserManagementService.update_profile(user, **update_data)
|
|
|
|
return 200, updated_user
|
|
|
|
except ValidationError as e:
|
|
return 400, {"error": "Update failed", "detail": str(e)}
|
|
except Exception as e:
|
|
logger.error(f"Profile update error: {e}")
|
|
return 400, {"error": "Update failed", "detail": str(e)}
|
|
|
|
|
|
@router.get("/me/role", auth=jwt_auth, response={200: UserRoleOut, 404: ErrorSchema})
|
|
@require_auth
|
|
def get_my_role(request: HttpRequest):
|
|
"""
|
|
Get current user's role.
|
|
|
|
Returns role information including permissions.
|
|
"""
|
|
try:
|
|
user = request.auth
|
|
role = user.role
|
|
|
|
response_data = {
|
|
"role": role.role,
|
|
"is_moderator": role.is_moderator,
|
|
"is_admin": role.is_admin,
|
|
"granted_at": role.granted_at,
|
|
"granted_by_email": role.granted_by.email if role.granted_by else None
|
|
}
|
|
|
|
return 200, response_data
|
|
|
|
except UserRole.DoesNotExist:
|
|
return 404, {"error": "Role not found", "detail": "User role not assigned"}
|
|
|
|
|
|
@router.get("/me/permissions", auth=jwt_auth, response={200: UserPermissionsOut})
|
|
@require_auth
|
|
def get_my_permissions(request: HttpRequest):
|
|
"""
|
|
Get current user's permissions.
|
|
|
|
Returns a summary of what the user can do.
|
|
"""
|
|
user = request.auth
|
|
permissions = RoleService.get_user_permissions(user)
|
|
return 200, permissions
|
|
|
|
|
|
@router.get("/me/stats", auth=jwt_auth, response={200: UserStatsOut})
|
|
@require_auth
|
|
def get_my_stats(request: HttpRequest):
|
|
"""
|
|
Get current user's statistics.
|
|
|
|
Returns submission stats, reputation score, and activity information.
|
|
"""
|
|
user = request.auth
|
|
stats = UserManagementService.get_user_stats(user)
|
|
return 200, stats
|
|
|
|
|
|
# ============================================================================
|
|
# User Preferences Endpoints
|
|
# ============================================================================
|
|
|
|
@router.get("/me/preferences", auth=jwt_auth, response={200: UserProfilePreferencesOut})
|
|
@require_auth
|
|
def get_my_preferences(request: HttpRequest):
|
|
"""
|
|
Get current user's preferences.
|
|
|
|
Returns notification and privacy preferences.
|
|
"""
|
|
user = request.auth
|
|
profile = user.profile
|
|
return 200, profile
|
|
|
|
|
|
@router.patch("/me/preferences", auth=jwt_auth, response={200: UserProfilePreferencesOut, 400: ErrorSchema})
|
|
@require_auth
|
|
def update_my_preferences(request: HttpRequest, data: UserProfilePreferencesUpdate):
|
|
"""
|
|
Update current user's preferences.
|
|
|
|
- **email_notifications**: Receive email notifications
|
|
- **email_on_submission_approved**: Email when submissions approved
|
|
- **email_on_submission_rejected**: Email when submissions rejected
|
|
- **profile_public**: Make profile publicly visible
|
|
- **show_email**: Show email on public profile
|
|
"""
|
|
try:
|
|
user = request.auth
|
|
|
|
# Prepare update data
|
|
update_data = data.dict(exclude_unset=True)
|
|
|
|
# Update preferences
|
|
updated_profile = UserManagementService.update_preferences(user, **update_data)
|
|
|
|
return 200, updated_profile
|
|
|
|
except Exception as e:
|
|
logger.error(f"Preferences update error: {e}")
|
|
return 400, {"error": "Update failed", "detail": str(e)}
|
|
|
|
|
|
# ============================================================================
|
|
# Password Management Endpoints
|
|
# ============================================================================
|
|
|
|
@router.post("/password/change", auth=jwt_auth, response={200: MessageSchema, 400: ErrorSchema})
|
|
@require_auth
|
|
def change_password(request: HttpRequest, data: ChangePasswordRequest):
|
|
"""
|
|
Change current user's password.
|
|
|
|
- **old_password**: Current password (required)
|
|
- **new_password**: New password (min 8 characters, required)
|
|
- **new_password_confirm**: New password confirmation (required)
|
|
"""
|
|
try:
|
|
user = request.auth
|
|
|
|
AuthenticationService.change_password(
|
|
user=user,
|
|
old_password=data.old_password,
|
|
new_password=data.new_password
|
|
)
|
|
|
|
return 200, {"message": "Password changed successfully", "success": True}
|
|
|
|
except ValidationError as e:
|
|
error_msg = str(e.message_dict) if hasattr(e, 'message_dict') else str(e)
|
|
return 400, {"error": "Password change failed", "detail": error_msg}
|
|
|
|
|
|
@router.post("/password/reset", response={200: MessageSchema})
|
|
def request_password_reset(request: HttpRequest, data: ResetPasswordRequest):
|
|
"""
|
|
Request password reset email.
|
|
|
|
- **email**: User's email address
|
|
|
|
Note: This is a placeholder. In production, this should send a reset email.
|
|
For now, it returns success regardless of whether the email exists.
|
|
"""
|
|
# TODO: Implement email sending with password reset token
|
|
# For security, always return success even if email doesn't exist
|
|
return 200, {
|
|
"message": "If the email exists, a password reset link has been sent",
|
|
"success": True
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# MFA/2FA Endpoints
|
|
# ============================================================================
|
|
|
|
@router.post("/mfa/enable", auth=jwt_auth, response={200: TOTPEnableResponse, 400: ErrorSchema})
|
|
@require_auth
|
|
def enable_mfa(request: HttpRequest):
|
|
"""
|
|
Enable MFA/2FA for current user.
|
|
|
|
Returns TOTP secret and QR code URL for authenticator apps.
|
|
User must confirm with a valid token to complete setup.
|
|
"""
|
|
try:
|
|
user = request.auth
|
|
|
|
# Create TOTP device
|
|
device = MFAService.enable_totp(user)
|
|
|
|
# Generate QR code URL
|
|
issuer = "ThrillWiki"
|
|
qr_url = device.config_url
|
|
|
|
return 200, {
|
|
"secret": device.key,
|
|
"qr_code_url": qr_url,
|
|
"backup_codes": [] # TODO: Generate backup codes
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"MFA enable error: {e}")
|
|
return 400, {"error": "MFA setup failed", "detail": str(e)}
|
|
|
|
|
|
@router.post("/mfa/confirm", auth=jwt_auth, response={200: MessageSchema, 400: ErrorSchema})
|
|
@require_auth
|
|
def confirm_mfa(request: HttpRequest, data: TOTPConfirmRequest):
|
|
"""
|
|
Confirm MFA setup with verification token.
|
|
|
|
- **token**: 6-digit TOTP token from authenticator app
|
|
|
|
Completes MFA setup after verifying the token is valid.
|
|
"""
|
|
try:
|
|
user = request.auth
|
|
|
|
MFAService.confirm_totp(user, data.token)
|
|
|
|
return 200, {"message": "MFA enabled successfully", "success": True}
|
|
|
|
except ValidationError as e:
|
|
return 400, {"error": "Confirmation failed", "detail": str(e)}
|
|
|
|
|
|
@router.post("/mfa/disable", auth=jwt_auth, response={200: MessageSchema})
|
|
@require_auth
|
|
def disable_mfa(request: HttpRequest):
|
|
"""
|
|
Disable MFA/2FA for current user.
|
|
|
|
Removes all TOTP devices and disables MFA requirement.
|
|
"""
|
|
user = request.auth
|
|
MFAService.disable_totp(user)
|
|
|
|
return 200, {"message": "MFA disabled successfully", "success": True}
|
|
|
|
|
|
@router.post("/mfa/verify", auth=jwt_auth, response={200: MessageSchema, 400: ErrorSchema})
|
|
@require_auth
|
|
def verify_mfa_token(request: HttpRequest, data: TOTPVerifyRequest):
|
|
"""
|
|
Verify MFA token (for testing).
|
|
|
|
- **token**: 6-digit TOTP token
|
|
|
|
Returns whether the token is valid.
|
|
"""
|
|
user = request.auth
|
|
|
|
if MFAService.verify_totp(user, data.token):
|
|
return 200, {"message": "Token is valid", "success": True}
|
|
else:
|
|
return 400, {"error": "Invalid token", "detail": "The token is not valid"}
|
|
|
|
|
|
# ============================================================================
|
|
# User Management Endpoints (Admin Only)
|
|
# ============================================================================
|
|
|
|
@router.get("/users", auth=jwt_auth, response={200: UserListOut, 403: ErrorSchema})
|
|
@require_admin
|
|
def list_users(
|
|
request: HttpRequest,
|
|
page: int = 1,
|
|
page_size: int = 50,
|
|
search: Optional[str] = None,
|
|
role: Optional[str] = None,
|
|
banned: Optional[bool] = None
|
|
):
|
|
"""
|
|
List all users (admin only).
|
|
|
|
- **page**: Page number (default: 1)
|
|
- **page_size**: Items per page (default: 50, max: 100)
|
|
- **search**: Search by email or username
|
|
- **role**: Filter by role (user, moderator, admin)
|
|
- **banned**: Filter by banned status
|
|
"""
|
|
# Build query
|
|
queryset = User.objects.select_related('role').all()
|
|
|
|
# Apply filters
|
|
if search:
|
|
queryset = queryset.filter(
|
|
Q(email__icontains=search) |
|
|
Q(username__icontains=search) |
|
|
Q(first_name__icontains=search) |
|
|
Q(last_name__icontains=search)
|
|
)
|
|
|
|
if role:
|
|
queryset = queryset.filter(role__role=role)
|
|
|
|
if banned is not None:
|
|
queryset = queryset.filter(banned=banned)
|
|
|
|
# Pagination
|
|
page_size = min(page_size, 100) # Max 100 items per page
|
|
total = queryset.count()
|
|
total_pages = (total + page_size - 1) // page_size
|
|
|
|
start = (page - 1) * page_size
|
|
end = start + page_size
|
|
|
|
users = list(queryset[start:end])
|
|
|
|
return 200, {
|
|
"items": users,
|
|
"total": total,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total_pages": total_pages
|
|
}
|
|
|
|
|
|
@router.get("/users/{user_id}", auth=jwt_auth, response={200: UserProfileOut, 404: ErrorSchema})
|
|
@require_admin
|
|
def get_user(request: HttpRequest, user_id: str):
|
|
"""
|
|
Get user by ID (admin only).
|
|
|
|
Returns detailed profile information for the specified user.
|
|
"""
|
|
try:
|
|
user = User.objects.get(id=user_id)
|
|
return 200, user
|
|
except User.DoesNotExist:
|
|
return 404, {"error": "User not found", "detail": f"No user with ID {user_id}"}
|
|
|
|
|
|
@router.post("/users/ban", auth=jwt_auth, response={200: MessageSchema, 400: ErrorSchema})
|
|
@require_admin
|
|
def ban_user(request: HttpRequest, data: BanUserRequest):
|
|
"""
|
|
Ban a user (admin only).
|
|
|
|
- **user_id**: User ID to ban
|
|
- **reason**: Reason for ban
|
|
"""
|
|
try:
|
|
user = User.objects.get(id=data.user_id)
|
|
admin = request.auth
|
|
|
|
UserManagementService.ban_user(user, data.reason, admin)
|
|
|
|
return 200, {"message": f"User {user.email} has been banned", "success": True}
|
|
|
|
except User.DoesNotExist:
|
|
return 400, {"error": "User not found", "detail": f"No user with ID {data.user_id}"}
|
|
|
|
|
|
@router.post("/users/unban", auth=jwt_auth, response={200: MessageSchema, 400: ErrorSchema})
|
|
@require_admin
|
|
def unban_user(request: HttpRequest, data: UnbanUserRequest):
|
|
"""
|
|
Unban a user (admin only).
|
|
|
|
- **user_id**: User ID to unban
|
|
"""
|
|
try:
|
|
user = User.objects.get(id=data.user_id)
|
|
|
|
UserManagementService.unban_user(user)
|
|
|
|
return 200, {"message": f"User {user.email} has been unbanned", "success": True}
|
|
|
|
except User.DoesNotExist:
|
|
return 400, {"error": "User not found", "detail": f"No user with ID {data.user_id}"}
|
|
|
|
|
|
@router.post("/users/assign-role", auth=jwt_auth, response={200: MessageSchema, 400: ErrorSchema})
|
|
@require_admin
|
|
def assign_role(request: HttpRequest, data: AssignRoleRequest):
|
|
"""
|
|
Assign role to user (admin only).
|
|
|
|
- **user_id**: User ID
|
|
- **role**: Role to assign (user, moderator, admin)
|
|
"""
|
|
try:
|
|
user = User.objects.get(id=data.user_id)
|
|
admin = request.auth
|
|
|
|
RoleService.assign_role(user, data.role, admin)
|
|
|
|
return 200, {"message": f"Role '{data.role}' assigned to {user.email}", "success": True}
|
|
|
|
except User.DoesNotExist:
|
|
return 400, {"error": "User not found", "detail": f"No user with ID {data.user_id}"}
|
|
except ValidationError as e:
|
|
return 400, {"error": "Invalid role", "detail": str(e)}
|