""" Authentication API endpoints. Provides endpoints for: - User registration and login - JWT token management - MFA/2FA - Password management - User profile and preferences - User administration """ from typing import List, Optional from django.http import HttpRequest from django.core.exceptions import ValidationError, PermissionDenied from django.db.models import Q from ninja import Router from rest_framework_simplejwt.tokens import RefreshToken from rest_framework_simplejwt.exceptions import TokenError import logging from apps.users.models import User, UserRole, UserProfile from apps.users.services import ( AuthenticationService, MFAService, RoleService, UserManagementService ) from apps.users.permissions import ( jwt_auth, require_auth, require_admin, get_permission_checker ) from api.v1.schemas import ( UserRegisterRequest, UserLoginRequest, TokenResponse, TokenRefreshRequest, UserProfileOut, UserProfileUpdate, ChangePasswordRequest, ResetPasswordRequest, TOTPEnableResponse, TOTPConfirmRequest, TOTPVerifyRequest, UserRoleOut, UserPermissionsOut, UserStatsOut, UserProfilePreferencesOut, UserProfilePreferencesUpdate, BanUserRequest, UnbanUserRequest, AssignRoleRequest, UserListOut, MessageSchema, ErrorSchema, ) router = Router(tags=["Authentication"]) logger = logging.getLogger(__name__) # ============================================================================ # Public Authentication Endpoints # ============================================================================ @router.post("/register", response={201: UserProfileOut, 400: ErrorSchema}) def register(request: HttpRequest, data: UserRegisterRequest): """ Register a new user account. - **email**: User's email address (required) - **password**: Password (min 8 characters, required) - **password_confirm**: Password confirmation (required) - **username**: Username (optional, auto-generated if not provided) - **first_name**: First name (optional) - **last_name**: Last name (optional) Returns the created user profile and automatically logs in the user. """ try: # Register user user = AuthenticationService.register_user( email=data.email, password=data.password, username=data.username, first_name=data.first_name or '', last_name=data.last_name or '' ) logger.info(f"New user registered: {user.email}") return 201, user except ValidationError as e: error_msg = str(e.message_dict) if hasattr(e, 'message_dict') else str(e) return 400, {"error": "Registration failed", "detail": error_msg} except Exception as e: logger.error(f"Registration error: {e}") return 400, {"error": "Registration failed", "detail": str(e)} @router.post("/login", response={200: TokenResponse, 401: ErrorSchema}) def login(request: HttpRequest, data: UserLoginRequest): """ Login with email and password. - **email**: User's email address - **password**: Password - **mfa_token**: MFA token (required if MFA is enabled) Returns JWT access and refresh tokens on successful authentication. """ try: # Authenticate user user = AuthenticationService.authenticate_user(data.email, data.password) if not user: return 401, {"error": "Invalid credentials", "detail": "Email or password is incorrect"} # Check MFA if enabled if user.mfa_enabled: if not data.mfa_token: return 401, {"error": "MFA required", "detail": "Please provide MFA token"} if not MFAService.verify_totp(user, data.mfa_token): return 401, {"error": "Invalid MFA token", "detail": "The MFA token is invalid"} # Generate tokens refresh = RefreshToken.for_user(user) return 200, { "access": str(refresh.access_token), "refresh": str(refresh), "token_type": "Bearer" } except ValidationError as e: return 401, {"error": "Authentication failed", "detail": str(e)} except Exception as e: logger.error(f"Login error: {e}") return 401, {"error": "Authentication failed", "detail": str(e)} @router.post("/token/refresh", response={200: TokenResponse, 401: ErrorSchema}) def refresh_token(request: HttpRequest, data: TokenRefreshRequest): """ Refresh JWT access token using refresh token. - **refresh**: Refresh token Returns new access token and optionally a new refresh token. """ try: refresh = RefreshToken(data.refresh) return 200, { "access": str(refresh.access_token), "refresh": str(refresh), "token_type": "Bearer" } except TokenError as e: return 401, {"error": "Invalid token", "detail": str(e)} except Exception as e: logger.error(f"Token refresh error: {e}") return 401, {"error": "Token refresh failed", "detail": str(e)} @router.post("/logout", auth=jwt_auth, response={200: MessageSchema}) @require_auth def logout(request: HttpRequest): """ Logout (blacklist refresh token). Note: Requires authentication. The client should also discard the access token. """ # Note: Token blacklisting is handled by djangorestframework-simplejwt # when BLACKLIST_AFTER_ROTATION is True in settings return 200, {"message": "Logged out successfully", "success": True} # ============================================================================ # User Profile Endpoints # ============================================================================ @router.get("/me", auth=jwt_auth, response={200: UserProfileOut, 401: ErrorSchema}) @require_auth def get_my_profile(request: HttpRequest): """ Get current user's profile. Returns detailed profile information for the authenticated user. """ user = request.auth return 200, user @router.patch("/me", auth=jwt_auth, response={200: UserProfileOut, 400: ErrorSchema}) @require_auth def update_my_profile(request: HttpRequest, data: UserProfileUpdate): """ Update current user's profile. - **first_name**: First name (optional) - **last_name**: Last name (optional) - **username**: Username (optional) - **bio**: User biography (optional, max 500 characters) - **avatar_url**: Avatar image URL (optional) """ try: user = request.auth # Prepare update data update_data = data.dict(exclude_unset=True) # Update profile updated_user = UserManagementService.update_profile(user, **update_data) return 200, updated_user except ValidationError as e: return 400, {"error": "Update failed", "detail": str(e)} except Exception as e: logger.error(f"Profile update error: {e}") return 400, {"error": "Update failed", "detail": str(e)} @router.get("/me/role", auth=jwt_auth, response={200: UserRoleOut, 404: ErrorSchema}) @require_auth def get_my_role(request: HttpRequest): """ Get current user's role. Returns role information including permissions. """ try: user = request.auth role = user.role response_data = { "role": role.role, "is_moderator": role.is_moderator, "is_admin": role.is_admin, "granted_at": role.granted_at, "granted_by_email": role.granted_by.email if role.granted_by else None } return 200, response_data except UserRole.DoesNotExist: return 404, {"error": "Role not found", "detail": "User role not assigned"} @router.get("/me/permissions", auth=jwt_auth, response={200: UserPermissionsOut}) @require_auth def get_my_permissions(request: HttpRequest): """ Get current user's permissions. Returns a summary of what the user can do. """ user = request.auth permissions = RoleService.get_user_permissions(user) return 200, permissions @router.get("/me/stats", auth=jwt_auth, response={200: UserStatsOut}) @require_auth def get_my_stats(request: HttpRequest): """ Get current user's statistics. Returns submission stats, reputation score, and activity information. """ user = request.auth stats = UserManagementService.get_user_stats(user) return 200, stats # ============================================================================ # User Preferences Endpoints # ============================================================================ @router.get("/me/preferences", auth=jwt_auth, response={200: UserProfilePreferencesOut}) @require_auth def get_my_preferences(request: HttpRequest): """ Get current user's preferences. Returns notification and privacy preferences. """ user = request.auth profile = user.profile return 200, profile @router.patch("/me/preferences", auth=jwt_auth, response={200: UserProfilePreferencesOut, 400: ErrorSchema}) @require_auth def update_my_preferences(request: HttpRequest, data: UserProfilePreferencesUpdate): """ Update current user's preferences. - **email_notifications**: Receive email notifications - **email_on_submission_approved**: Email when submissions approved - **email_on_submission_rejected**: Email when submissions rejected - **profile_public**: Make profile publicly visible - **show_email**: Show email on public profile """ try: user = request.auth # Prepare update data update_data = data.dict(exclude_unset=True) # Update preferences updated_profile = UserManagementService.update_preferences(user, **update_data) return 200, updated_profile except Exception as e: logger.error(f"Preferences update error: {e}") return 400, {"error": "Update failed", "detail": str(e)} # ============================================================================ # Password Management Endpoints # ============================================================================ @router.post("/password/change", auth=jwt_auth, response={200: MessageSchema, 400: ErrorSchema}) @require_auth def change_password(request: HttpRequest, data: ChangePasswordRequest): """ Change current user's password. - **old_password**: Current password (required) - **new_password**: New password (min 8 characters, required) - **new_password_confirm**: New password confirmation (required) """ try: user = request.auth AuthenticationService.change_password( user=user, old_password=data.old_password, new_password=data.new_password ) return 200, {"message": "Password changed successfully", "success": True} except ValidationError as e: error_msg = str(e.message_dict) if hasattr(e, 'message_dict') else str(e) return 400, {"error": "Password change failed", "detail": error_msg} @router.post("/password/reset", response={200: MessageSchema}) def request_password_reset(request: HttpRequest, data: ResetPasswordRequest): """ Request password reset email. - **email**: User's email address Note: This is a placeholder. In production, this should send a reset email. For now, it returns success regardless of whether the email exists. """ # TODO: Implement email sending with password reset token # For security, always return success even if email doesn't exist return 200, { "message": "If the email exists, a password reset link has been sent", "success": True } # ============================================================================ # MFA/2FA Endpoints # ============================================================================ @router.post("/mfa/enable", auth=jwt_auth, response={200: TOTPEnableResponse, 400: ErrorSchema}) @require_auth def enable_mfa(request: HttpRequest): """ Enable MFA/2FA for current user. Returns TOTP secret and QR code URL for authenticator apps. User must confirm with a valid token to complete setup. """ try: user = request.auth # Create TOTP device device = MFAService.enable_totp(user) # Generate QR code URL issuer = "ThrillWiki" qr_url = device.config_url return 200, { "secret": device.key, "qr_code_url": qr_url, "backup_codes": [] # TODO: Generate backup codes } except Exception as e: logger.error(f"MFA enable error: {e}") return 400, {"error": "MFA setup failed", "detail": str(e)} @router.post("/mfa/confirm", auth=jwt_auth, response={200: MessageSchema, 400: ErrorSchema}) @require_auth def confirm_mfa(request: HttpRequest, data: TOTPConfirmRequest): """ Confirm MFA setup with verification token. - **token**: 6-digit TOTP token from authenticator app Completes MFA setup after verifying the token is valid. """ try: user = request.auth MFAService.confirm_totp(user, data.token) return 200, {"message": "MFA enabled successfully", "success": True} except ValidationError as e: return 400, {"error": "Confirmation failed", "detail": str(e)} @router.post("/mfa/disable", auth=jwt_auth, response={200: MessageSchema}) @require_auth def disable_mfa(request: HttpRequest): """ Disable MFA/2FA for current user. Removes all TOTP devices and disables MFA requirement. """ user = request.auth MFAService.disable_totp(user) return 200, {"message": "MFA disabled successfully", "success": True} @router.post("/mfa/verify", auth=jwt_auth, response={200: MessageSchema, 400: ErrorSchema}) @require_auth def verify_mfa_token(request: HttpRequest, data: TOTPVerifyRequest): """ Verify MFA token (for testing). - **token**: 6-digit TOTP token Returns whether the token is valid. """ user = request.auth if MFAService.verify_totp(user, data.token): return 200, {"message": "Token is valid", "success": True} else: return 400, {"error": "Invalid token", "detail": "The token is not valid"} # ============================================================================ # User Management Endpoints (Admin Only) # ============================================================================ @router.get("/users", auth=jwt_auth, response={200: UserListOut, 403: ErrorSchema}) @require_admin def list_users( request: HttpRequest, page: int = 1, page_size: int = 50, search: Optional[str] = None, role: Optional[str] = None, banned: Optional[bool] = None ): """ List all users (admin only). - **page**: Page number (default: 1) - **page_size**: Items per page (default: 50, max: 100) - **search**: Search by email or username - **role**: Filter by role (user, moderator, admin) - **banned**: Filter by banned status """ # Build query queryset = User.objects.select_related('role').all() # Apply filters if search: queryset = queryset.filter( Q(email__icontains=search) | Q(username__icontains=search) | Q(first_name__icontains=search) | Q(last_name__icontains=search) ) if role: queryset = queryset.filter(role__role=role) if banned is not None: queryset = queryset.filter(banned=banned) # Pagination page_size = min(page_size, 100) # Max 100 items per page total = queryset.count() total_pages = (total + page_size - 1) // page_size start = (page - 1) * page_size end = start + page_size users = list(queryset[start:end]) return 200, { "items": users, "total": total, "page": page, "page_size": page_size, "total_pages": total_pages } @router.get("/users/{user_id}", auth=jwt_auth, response={200: UserProfileOut, 404: ErrorSchema}) @require_admin def get_user(request: HttpRequest, user_id: str): """ Get user by ID (admin only). Returns detailed profile information for the specified user. """ try: user = User.objects.get(id=user_id) return 200, user except User.DoesNotExist: return 404, {"error": "User not found", "detail": f"No user with ID {user_id}"} @router.post("/users/ban", auth=jwt_auth, response={200: MessageSchema, 400: ErrorSchema}) @require_admin def ban_user(request: HttpRequest, data: BanUserRequest): """ Ban a user (admin only). - **user_id**: User ID to ban - **reason**: Reason for ban """ try: user = User.objects.get(id=data.user_id) admin = request.auth UserManagementService.ban_user(user, data.reason, admin) return 200, {"message": f"User {user.email} has been banned", "success": True} except User.DoesNotExist: return 400, {"error": "User not found", "detail": f"No user with ID {data.user_id}"} @router.post("/users/unban", auth=jwt_auth, response={200: MessageSchema, 400: ErrorSchema}) @require_admin def unban_user(request: HttpRequest, data: UnbanUserRequest): """ Unban a user (admin only). - **user_id**: User ID to unban """ try: user = User.objects.get(id=data.user_id) UserManagementService.unban_user(user) return 200, {"message": f"User {user.email} has been unbanned", "success": True} except User.DoesNotExist: return 400, {"error": "User not found", "detail": f"No user with ID {data.user_id}"} @router.post("/users/assign-role", auth=jwt_auth, response={200: MessageSchema, 400: ErrorSchema}) @require_admin def assign_role(request: HttpRequest, data: AssignRoleRequest): """ Assign role to user (admin only). - **user_id**: User ID - **role**: Role to assign (user, moderator, admin) """ try: user = User.objects.get(id=data.user_id) admin = request.auth RoleService.assign_role(user, data.role, admin) return 200, {"message": f"Role '{data.role}' assigned to {user.email}", "success": True} except User.DoesNotExist: return 400, {"error": "User not found", "detail": f"No user with ID {data.user_id}"} except ValidationError as e: return 400, {"error": "Invalid role", "detail": str(e)}