mirror of
https://github.com/pacnpal/thrilltrack-explorer.git
synced 2025-12-20 09:11:12 -05:00
- Created a base email template (base.html) for consistent styling across all emails. - Added moderation approval email template (moderation_approved.html) to notify users of approved submissions. - Added moderation rejection email template (moderation_rejected.html) to inform users of required changes for their submissions. - Created password reset email template (password_reset.html) for users requesting to reset their passwords. - Developed a welcome email template (welcome.html) to greet new users and provide account details and tips for using ThrillWiki.
311 lines
8.6 KiB
Python
311 lines
8.6 KiB
Python
"""
|
|
Permission utilities and decorators for API endpoints.
|
|
|
|
Provides:
|
|
- Permission checking decorators
|
|
- Role-based access control
|
|
- Object-level permissions
|
|
"""
|
|
|
|
from functools import wraps
|
|
from typing import Optional, Callable
|
|
from django.http import HttpRequest
|
|
from ninja import Router
|
|
from ninja.security import HttpBearer
|
|
from rest_framework_simplejwt.tokens import AccessToken
|
|
from rest_framework_simplejwt.exceptions import TokenError
|
|
from django.core.exceptions import PermissionDenied
|
|
import logging
|
|
|
|
from .models import User, UserRole
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class JWTAuth(HttpBearer):
|
|
"""JWT authentication for django-ninja"""
|
|
|
|
def authenticate(self, request: HttpRequest, token: str) -> Optional[User]:
|
|
"""
|
|
Authenticate user from JWT token.
|
|
|
|
Args:
|
|
request: HTTP request
|
|
token: JWT access token
|
|
|
|
Returns:
|
|
User instance if valid, None otherwise
|
|
"""
|
|
try:
|
|
# Decode token
|
|
access_token = AccessToken(token)
|
|
user_id = access_token['user_id']
|
|
|
|
# Get user
|
|
user = User.objects.get(id=user_id)
|
|
|
|
# Check if banned
|
|
if user.banned:
|
|
logger.warning(f"Banned user attempted API access: {user.email}")
|
|
return None
|
|
|
|
return user
|
|
|
|
except TokenError as e:
|
|
logger.debug(f"Invalid token: {e}")
|
|
return None
|
|
except User.DoesNotExist:
|
|
logger.warning(f"Token for non-existent user: {user_id}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Authentication error: {e}")
|
|
return None
|
|
|
|
|
|
# Global JWT auth instance
|
|
jwt_auth = JWTAuth()
|
|
|
|
|
|
def require_auth(func: Callable) -> Callable:
|
|
"""
|
|
Decorator to require authentication.
|
|
|
|
Usage:
|
|
@api.get("/protected")
|
|
@require_auth
|
|
def protected_endpoint(request):
|
|
return {"user": request.auth.email}
|
|
"""
|
|
@wraps(func)
|
|
def wrapper(request: HttpRequest, *args, **kwargs):
|
|
if not request.auth or not isinstance(request.auth, User):
|
|
raise PermissionDenied("Authentication required")
|
|
return func(request, *args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def require_role(role: str) -> Callable:
|
|
"""
|
|
Decorator to require specific role.
|
|
|
|
Args:
|
|
role: Required role (user, moderator, admin)
|
|
|
|
Usage:
|
|
@api.post("/moderate")
|
|
@require_role("moderator")
|
|
def moderate_endpoint(request):
|
|
return {"message": "Access granted"}
|
|
"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def wrapper(request: HttpRequest, *args, **kwargs):
|
|
if not request.auth or not isinstance(request.auth, User):
|
|
raise PermissionDenied("Authentication required")
|
|
|
|
user = request.auth
|
|
|
|
try:
|
|
user_role = user.role
|
|
|
|
# Admin has access to everything
|
|
if user_role.is_admin:
|
|
return func(request, *args, **kwargs)
|
|
|
|
# Check specific role
|
|
if role == 'moderator' and user_role.is_moderator:
|
|
return func(request, *args, **kwargs)
|
|
elif role == 'user':
|
|
return func(request, *args, **kwargs)
|
|
|
|
raise PermissionDenied(f"Role '{role}' required")
|
|
|
|
except UserRole.DoesNotExist:
|
|
raise PermissionDenied("User role not assigned")
|
|
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def require_moderator(func: Callable) -> Callable:
|
|
"""
|
|
Decorator to require moderator or admin role.
|
|
|
|
Usage:
|
|
@api.post("/approve")
|
|
@require_moderator
|
|
def approve_endpoint(request):
|
|
return {"message": "Access granted"}
|
|
"""
|
|
return require_role("moderator")(func)
|
|
|
|
|
|
def require_admin(func: Callable) -> Callable:
|
|
"""
|
|
Decorator to require admin role.
|
|
|
|
Usage:
|
|
@api.delete("/delete-user")
|
|
@require_admin
|
|
def delete_user_endpoint(request):
|
|
return {"message": "Access granted"}
|
|
"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def wrapper(request: HttpRequest, *args, **kwargs):
|
|
if not request.auth or not isinstance(request.auth, User):
|
|
raise PermissionDenied("Authentication required")
|
|
|
|
user = request.auth
|
|
|
|
try:
|
|
user_role = user.role
|
|
|
|
if not user_role.is_admin:
|
|
raise PermissionDenied("Admin role required")
|
|
|
|
return func(request, *args, **kwargs)
|
|
|
|
except UserRole.DoesNotExist:
|
|
raise PermissionDenied("User role not assigned")
|
|
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def is_owner_or_moderator(user: User, obj_user_id) -> bool:
|
|
"""
|
|
Check if user is the owner of an object or a moderator.
|
|
|
|
Args:
|
|
user: User to check
|
|
obj_user_id: User ID of the object owner
|
|
|
|
Returns:
|
|
True if user is owner or moderator
|
|
"""
|
|
if str(user.id) == str(obj_user_id):
|
|
return True
|
|
|
|
try:
|
|
return user.role.is_moderator
|
|
except UserRole.DoesNotExist:
|
|
return False
|
|
|
|
|
|
def can_moderate(user: User) -> bool:
|
|
"""
|
|
Check if user can moderate content.
|
|
|
|
Args:
|
|
user: User to check
|
|
|
|
Returns:
|
|
True if user is moderator or admin
|
|
"""
|
|
if user.banned:
|
|
return False
|
|
|
|
try:
|
|
return user.role.is_moderator
|
|
except UserRole.DoesNotExist:
|
|
return False
|
|
|
|
|
|
def can_submit(user: User) -> bool:
|
|
"""
|
|
Check if user can submit content.
|
|
|
|
Args:
|
|
user: User to check
|
|
|
|
Returns:
|
|
True if user is not banned
|
|
"""
|
|
return not user.banned
|
|
|
|
|
|
class PermissionChecker:
|
|
"""Helper class for checking permissions"""
|
|
|
|
def __init__(self, user: User):
|
|
self.user = user
|
|
try:
|
|
self.user_role = user.role
|
|
except UserRole.DoesNotExist:
|
|
self.user_role = None
|
|
|
|
@property
|
|
def is_authenticated(self) -> bool:
|
|
"""Check if user is authenticated"""
|
|
return self.user is not None
|
|
|
|
@property
|
|
def is_moderator(self) -> bool:
|
|
"""Check if user is moderator or admin"""
|
|
if self.user.banned:
|
|
return False
|
|
return self.user_role and self.user_role.is_moderator
|
|
|
|
@property
|
|
def is_admin(self) -> bool:
|
|
"""Check if user is admin"""
|
|
if self.user.banned:
|
|
return False
|
|
return self.user_role and self.user_role.is_admin
|
|
|
|
@property
|
|
def can_submit(self) -> bool:
|
|
"""Check if user can submit content"""
|
|
return not self.user.banned
|
|
|
|
@property
|
|
def can_moderate(self) -> bool:
|
|
"""Check if user can moderate content"""
|
|
return self.is_moderator
|
|
|
|
def can_edit(self, obj_user_id) -> bool:
|
|
"""Check if user can edit an object"""
|
|
if self.user.banned:
|
|
return False
|
|
return str(self.user.id) == str(obj_user_id) or self.is_moderator
|
|
|
|
def can_delete(self, obj_user_id) -> bool:
|
|
"""Check if user can delete an object"""
|
|
if self.user.banned:
|
|
return False
|
|
return str(self.user.id) == str(obj_user_id) or self.is_admin
|
|
|
|
def require_permission(self, permission: str) -> None:
|
|
"""
|
|
Raise PermissionDenied if user doesn't have permission.
|
|
|
|
Args:
|
|
permission: Permission to check (submit, moderate, admin)
|
|
|
|
Raises:
|
|
PermissionDenied: If user doesn't have permission
|
|
"""
|
|
if permission == 'submit' and not self.can_submit:
|
|
raise PermissionDenied("You are banned from submitting content")
|
|
elif permission == 'moderate' and not self.can_moderate:
|
|
raise PermissionDenied("Moderator role required")
|
|
elif permission == 'admin' and not self.is_admin:
|
|
raise PermissionDenied("Admin role required")
|
|
|
|
|
|
def get_permission_checker(request: HttpRequest) -> Optional[PermissionChecker]:
|
|
"""
|
|
Get permission checker for request user.
|
|
|
|
Args:
|
|
request: HTTP request
|
|
|
|
Returns:
|
|
PermissionChecker instance or None if not authenticated
|
|
"""
|
|
if not request.auth or not isinstance(request.auth, User):
|
|
return None
|
|
|
|
return PermissionChecker(request.auth)
|