""" Permission utilities and decorators for API endpoints. Provides: - Permission checking decorators - Role-based access control - Object-level permissions """ from functools import wraps from typing import Optional, Callable from django.http import HttpRequest from ninja import Router from ninja.security import HttpBearer from rest_framework_simplejwt.tokens import AccessToken from rest_framework_simplejwt.exceptions import TokenError from django.core.exceptions import PermissionDenied import logging from .models import User, UserRole logger = logging.getLogger(__name__) class JWTAuth(HttpBearer): """JWT authentication for django-ninja""" def authenticate(self, request: HttpRequest, token: str) -> Optional[User]: """ Authenticate user from JWT token. Args: request: HTTP request token: JWT access token Returns: User instance if valid, None otherwise """ try: # Decode token access_token = AccessToken(token) user_id = access_token['user_id'] # Get user user = User.objects.get(id=user_id) # Check if banned if user.banned: logger.warning(f"Banned user attempted API access: {user.email}") return None return user except TokenError as e: logger.debug(f"Invalid token: {e}") return None except User.DoesNotExist: logger.warning(f"Token for non-existent user: {user_id}") return None except Exception as e: logger.error(f"Authentication error: {e}") return None # Global JWT auth instance jwt_auth = JWTAuth() def require_auth(func: Callable) -> Callable: """ Decorator to require authentication. Usage: @api.get("/protected") @require_auth def protected_endpoint(request): return {"user": request.auth.email} """ @wraps(func) def wrapper(request: HttpRequest, *args, **kwargs): if not request.auth or not isinstance(request.auth, User): raise PermissionDenied("Authentication required") return func(request, *args, **kwargs) return wrapper def require_role(role: str) -> Callable: """ Decorator to require specific role. Args: role: Required role (user, moderator, admin) Usage: @api.post("/moderate") @require_role("moderator") def moderate_endpoint(request): return {"message": "Access granted"} """ def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(request: HttpRequest, *args, **kwargs): if not request.auth or not isinstance(request.auth, User): raise PermissionDenied("Authentication required") user = request.auth try: user_role = user.role # Admin has access to everything if user_role.is_admin: return func(request, *args, **kwargs) # Check specific role if role == 'moderator' and user_role.is_moderator: return func(request, *args, **kwargs) elif role == 'user': return func(request, *args, **kwargs) raise PermissionDenied(f"Role '{role}' required") except UserRole.DoesNotExist: raise PermissionDenied("User role not assigned") return wrapper return decorator def require_moderator(func: Callable) -> Callable: """ Decorator to require moderator or admin role. Usage: @api.post("/approve") @require_moderator def approve_endpoint(request): return {"message": "Access granted"} """ return require_role("moderator")(func) def require_admin(func: Callable) -> Callable: """ Decorator to require admin role. Usage: @api.delete("/delete-user") @require_admin def delete_user_endpoint(request): return {"message": "Access granted"} """ def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(request: HttpRequest, *args, **kwargs): if not request.auth or not isinstance(request.auth, User): raise PermissionDenied("Authentication required") user = request.auth try: user_role = user.role if not user_role.is_admin: raise PermissionDenied("Admin role required") return func(request, *args, **kwargs) except UserRole.DoesNotExist: raise PermissionDenied("User role not assigned") return wrapper return decorator def is_owner_or_moderator(user: User, obj_user_id) -> bool: """ Check if user is the owner of an object or a moderator. Args: user: User to check obj_user_id: User ID of the object owner Returns: True if user is owner or moderator """ if str(user.id) == str(obj_user_id): return True try: return user.role.is_moderator except UserRole.DoesNotExist: return False def can_moderate(user: User) -> bool: """ Check if user can moderate content. Args: user: User to check Returns: True if user is moderator or admin """ if user.banned: return False try: return user.role.is_moderator except UserRole.DoesNotExist: return False def can_submit(user: User) -> bool: """ Check if user can submit content. Args: user: User to check Returns: True if user is not banned """ return not user.banned class PermissionChecker: """Helper class for checking permissions""" def __init__(self, user: User): self.user = user try: self.user_role = user.role except UserRole.DoesNotExist: self.user_role = None @property def is_authenticated(self) -> bool: """Check if user is authenticated""" return self.user is not None @property def is_moderator(self) -> bool: """Check if user is moderator or admin""" if self.user.banned: return False return self.user_role and self.user_role.is_moderator @property def is_admin(self) -> bool: """Check if user is admin""" if self.user.banned: return False return self.user_role and self.user_role.is_admin @property def can_submit(self) -> bool: """Check if user can submit content""" return not self.user.banned @property def can_moderate(self) -> bool: """Check if user can moderate content""" return self.is_moderator def can_edit(self, obj_user_id) -> bool: """Check if user can edit an object""" if self.user.banned: return False return str(self.user.id) == str(obj_user_id) or self.is_moderator def can_delete(self, obj_user_id) -> bool: """Check if user can delete an object""" if self.user.banned: return False return str(self.user.id) == str(obj_user_id) or self.is_admin def require_permission(self, permission: str) -> None: """ Raise PermissionDenied if user doesn't have permission. Args: permission: Permission to check (submit, moderate, admin) Raises: PermissionDenied: If user doesn't have permission """ if permission == 'submit' and not self.can_submit: raise PermissionDenied("You are banned from submitting content") elif permission == 'moderate' and not self.can_moderate: raise PermissionDenied("Moderator role required") elif permission == 'admin' and not self.is_admin: raise PermissionDenied("Admin role required") def get_permission_checker(request: HttpRequest) -> Optional[PermissionChecker]: """ Get permission checker for request user. Args: request: HTTP request Returns: PermissionChecker instance or None if not authenticated """ if not request.auth or not isinstance(request.auth, User): return None return PermissionChecker(request.auth)