mirror of
https://github.com/pacnpal/thrilltrack-explorer.git
synced 2025-12-22 17:31:15 -05:00
Refactor code structure and remove redundant changes
This commit is contained in:
310
django-backend/apps/users/permissions.py
Normal file
310
django-backend/apps/users/permissions.py
Normal file
@@ -0,0 +1,310 @@
|
||||
"""
|
||||
Permission utilities and decorators for API endpoints.
|
||||
|
||||
Provides:
|
||||
- Permission checking decorators
|
||||
- Role-based access control
|
||||
- Object-level permissions
|
||||
"""
|
||||
|
||||
from functools import wraps
|
||||
from typing import Optional, Callable
|
||||
from django.http import HttpRequest
|
||||
from ninja import Router
|
||||
from ninja.security import HttpBearer
|
||||
from rest_framework_simplejwt.tokens import AccessToken
|
||||
from rest_framework_simplejwt.exceptions import TokenError
|
||||
from django.core.exceptions import PermissionDenied
|
||||
import logging
|
||||
|
||||
from .models import User, UserRole
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JWTAuth(HttpBearer):
|
||||
"""JWT authentication for django-ninja"""
|
||||
|
||||
def authenticate(self, request: HttpRequest, token: str) -> Optional[User]:
|
||||
"""
|
||||
Authenticate user from JWT token.
|
||||
|
||||
Args:
|
||||
request: HTTP request
|
||||
token: JWT access token
|
||||
|
||||
Returns:
|
||||
User instance if valid, None otherwise
|
||||
"""
|
||||
try:
|
||||
# Decode token
|
||||
access_token = AccessToken(token)
|
||||
user_id = access_token['user_id']
|
||||
|
||||
# Get user
|
||||
user = User.objects.get(id=user_id)
|
||||
|
||||
# Check if banned
|
||||
if user.banned:
|
||||
logger.warning(f"Banned user attempted API access: {user.email}")
|
||||
return None
|
||||
|
||||
return user
|
||||
|
||||
except TokenError as e:
|
||||
logger.debug(f"Invalid token: {e}")
|
||||
return None
|
||||
except User.DoesNotExist:
|
||||
logger.warning(f"Token for non-existent user: {user_id}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Authentication error: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# Global JWT auth instance
|
||||
jwt_auth = JWTAuth()
|
||||
|
||||
|
||||
def require_auth(func: Callable) -> Callable:
|
||||
"""
|
||||
Decorator to require authentication.
|
||||
|
||||
Usage:
|
||||
@api.get("/protected")
|
||||
@require_auth
|
||||
def protected_endpoint(request):
|
||||
return {"user": request.auth.email}
|
||||
"""
|
||||
@wraps(func)
|
||||
def wrapper(request: HttpRequest, *args, **kwargs):
|
||||
if not request.auth or not isinstance(request.auth, User):
|
||||
raise PermissionDenied("Authentication required")
|
||||
return func(request, *args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
def require_role(role: str) -> Callable:
|
||||
"""
|
||||
Decorator to require specific role.
|
||||
|
||||
Args:
|
||||
role: Required role (user, moderator, admin)
|
||||
|
||||
Usage:
|
||||
@api.post("/moderate")
|
||||
@require_role("moderator")
|
||||
def moderate_endpoint(request):
|
||||
return {"message": "Access granted"}
|
||||
"""
|
||||
def decorator(func: Callable) -> Callable:
|
||||
@wraps(func)
|
||||
def wrapper(request: HttpRequest, *args, **kwargs):
|
||||
if not request.auth or not isinstance(request.auth, User):
|
||||
raise PermissionDenied("Authentication required")
|
||||
|
||||
user = request.auth
|
||||
|
||||
try:
|
||||
user_role = user.role
|
||||
|
||||
# Admin has access to everything
|
||||
if user_role.is_admin:
|
||||
return func(request, *args, **kwargs)
|
||||
|
||||
# Check specific role
|
||||
if role == 'moderator' and user_role.is_moderator:
|
||||
return func(request, *args, **kwargs)
|
||||
elif role == 'user':
|
||||
return func(request, *args, **kwargs)
|
||||
|
||||
raise PermissionDenied(f"Role '{role}' required")
|
||||
|
||||
except UserRole.DoesNotExist:
|
||||
raise PermissionDenied("User role not assigned")
|
||||
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
def require_moderator(func: Callable) -> Callable:
|
||||
"""
|
||||
Decorator to require moderator or admin role.
|
||||
|
||||
Usage:
|
||||
@api.post("/approve")
|
||||
@require_moderator
|
||||
def approve_endpoint(request):
|
||||
return {"message": "Access granted"}
|
||||
"""
|
||||
return require_role("moderator")(func)
|
||||
|
||||
|
||||
def require_admin(func: Callable) -> Callable:
|
||||
"""
|
||||
Decorator to require admin role.
|
||||
|
||||
Usage:
|
||||
@api.delete("/delete-user")
|
||||
@require_admin
|
||||
def delete_user_endpoint(request):
|
||||
return {"message": "Access granted"}
|
||||
"""
|
||||
def decorator(func: Callable) -> Callable:
|
||||
@wraps(func)
|
||||
def wrapper(request: HttpRequest, *args, **kwargs):
|
||||
if not request.auth or not isinstance(request.auth, User):
|
||||
raise PermissionDenied("Authentication required")
|
||||
|
||||
user = request.auth
|
||||
|
||||
try:
|
||||
user_role = user.role
|
||||
|
||||
if not user_role.is_admin:
|
||||
raise PermissionDenied("Admin role required")
|
||||
|
||||
return func(request, *args, **kwargs)
|
||||
|
||||
except UserRole.DoesNotExist:
|
||||
raise PermissionDenied("User role not assigned")
|
||||
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
def is_owner_or_moderator(user: User, obj_user_id) -> bool:
|
||||
"""
|
||||
Check if user is the owner of an object or a moderator.
|
||||
|
||||
Args:
|
||||
user: User to check
|
||||
obj_user_id: User ID of the object owner
|
||||
|
||||
Returns:
|
||||
True if user is owner or moderator
|
||||
"""
|
||||
if str(user.id) == str(obj_user_id):
|
||||
return True
|
||||
|
||||
try:
|
||||
return user.role.is_moderator
|
||||
except UserRole.DoesNotExist:
|
||||
return False
|
||||
|
||||
|
||||
def can_moderate(user: User) -> bool:
|
||||
"""
|
||||
Check if user can moderate content.
|
||||
|
||||
Args:
|
||||
user: User to check
|
||||
|
||||
Returns:
|
||||
True if user is moderator or admin
|
||||
"""
|
||||
if user.banned:
|
||||
return False
|
||||
|
||||
try:
|
||||
return user.role.is_moderator
|
||||
except UserRole.DoesNotExist:
|
||||
return False
|
||||
|
||||
|
||||
def can_submit(user: User) -> bool:
|
||||
"""
|
||||
Check if user can submit content.
|
||||
|
||||
Args:
|
||||
user: User to check
|
||||
|
||||
Returns:
|
||||
True if user is not banned
|
||||
"""
|
||||
return not user.banned
|
||||
|
||||
|
||||
class PermissionChecker:
|
||||
"""Helper class for checking permissions"""
|
||||
|
||||
def __init__(self, user: User):
|
||||
self.user = user
|
||||
try:
|
||||
self.user_role = user.role
|
||||
except UserRole.DoesNotExist:
|
||||
self.user_role = None
|
||||
|
||||
@property
|
||||
def is_authenticated(self) -> bool:
|
||||
"""Check if user is authenticated"""
|
||||
return self.user is not None
|
||||
|
||||
@property
|
||||
def is_moderator(self) -> bool:
|
||||
"""Check if user is moderator or admin"""
|
||||
if self.user.banned:
|
||||
return False
|
||||
return self.user_role and self.user_role.is_moderator
|
||||
|
||||
@property
|
||||
def is_admin(self) -> bool:
|
||||
"""Check if user is admin"""
|
||||
if self.user.banned:
|
||||
return False
|
||||
return self.user_role and self.user_role.is_admin
|
||||
|
||||
@property
|
||||
def can_submit(self) -> bool:
|
||||
"""Check if user can submit content"""
|
||||
return not self.user.banned
|
||||
|
||||
@property
|
||||
def can_moderate(self) -> bool:
|
||||
"""Check if user can moderate content"""
|
||||
return self.is_moderator
|
||||
|
||||
def can_edit(self, obj_user_id) -> bool:
|
||||
"""Check if user can edit an object"""
|
||||
if self.user.banned:
|
||||
return False
|
||||
return str(self.user.id) == str(obj_user_id) or self.is_moderator
|
||||
|
||||
def can_delete(self, obj_user_id) -> bool:
|
||||
"""Check if user can delete an object"""
|
||||
if self.user.banned:
|
||||
return False
|
||||
return str(self.user.id) == str(obj_user_id) or self.is_admin
|
||||
|
||||
def require_permission(self, permission: str) -> None:
|
||||
"""
|
||||
Raise PermissionDenied if user doesn't have permission.
|
||||
|
||||
Args:
|
||||
permission: Permission to check (submit, moderate, admin)
|
||||
|
||||
Raises:
|
||||
PermissionDenied: If user doesn't have permission
|
||||
"""
|
||||
if permission == 'submit' and not self.can_submit:
|
||||
raise PermissionDenied("You are banned from submitting content")
|
||||
elif permission == 'moderate' and not self.can_moderate:
|
||||
raise PermissionDenied("Moderator role required")
|
||||
elif permission == 'admin' and not self.is_admin:
|
||||
raise PermissionDenied("Admin role required")
|
||||
|
||||
|
||||
def get_permission_checker(request: HttpRequest) -> Optional[PermissionChecker]:
|
||||
"""
|
||||
Get permission checker for request user.
|
||||
|
||||
Args:
|
||||
request: HTTP request
|
||||
|
||||
Returns:
|
||||
PermissionChecker instance or None if not authenticated
|
||||
"""
|
||||
if not request.auth or not isinstance(request.auth, User):
|
||||
return None
|
||||
|
||||
return PermissionChecker(request.auth)
|
||||
Reference in New Issue
Block a user