Files
pacnpal c2c26cfd1d Add comprehensive API documentation for ThrillWiki integration and features
- Introduced Next.js integration guide for ThrillWiki API, detailing authentication, core domain APIs, data structures, and implementation patterns.
- Documented the migration to Rich Choice Objects, highlighting changes for frontend developers and enhanced metadata availability.
- Fixed the missing `get_by_slug` method in the Ride model, ensuring proper functionality of ride detail endpoints.
- Created a test script to verify manufacturer syncing with ride models, ensuring data integrity across related models.
2025-09-16 11:29:17 -04:00

738 lines
26 KiB
Python

"""
Moderation API Views
This module contains DRF viewsets for the moderation system, including:
- ModerationReport views for content reporting
- ModerationQueue views for moderation workflow
- ModerationAction views for tracking moderation actions
- BulkOperation views for administrative bulk operations
All views include comprehensive permissions, filtering, and pagination.
"""
from rest_framework import viewsets, status, permissions
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.filters import SearchFilter, OrderingFilter
from django_filters.rest_framework import DjangoFilterBackend
from django.contrib.auth import get_user_model
from django.utils import timezone
from django.db.models import Q, Count
from datetime import timedelta
from .models import (
ModerationReport,
ModerationQueue,
ModerationAction,
BulkOperation,
)
from .serializers import (
ModerationReportSerializer,
CreateModerationReportSerializer,
UpdateModerationReportSerializer,
ModerationQueueSerializer,
AssignQueueItemSerializer,
CompleteQueueItemSerializer,
ModerationActionSerializer,
CreateModerationActionSerializer,
BulkOperationSerializer,
CreateBulkOperationSerializer,
UserModerationProfileSerializer,
)
from .filters import (
ModerationReportFilter,
ModerationQueueFilter,
ModerationActionFilter,
BulkOperationFilter,
)
from .permissions import (
IsModeratorOrAdmin,
IsAdminOrSuperuser,
CanViewModerationData,
)
User = get_user_model()
# ============================================================================
# Moderation Report ViewSet
# ============================================================================
class ModerationReportViewSet(viewsets.ModelViewSet):
"""
ViewSet for managing moderation reports.
Provides CRUD operations for moderation reports with comprehensive
filtering, search, and permission controls.
"""
queryset = ModerationReport.objects.select_related(
"reported_by", "assigned_moderator", "content_type"
).all()
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_class = ModerationReportFilter
search_fields = ["reason", "description", "resolution_notes"]
ordering_fields = ["created_at", "updated_at", "priority", "status"]
ordering = ["-created_at"]
def get_serializer_class(self):
"""Return appropriate serializer based on action."""
if self.action == "create":
return CreateModerationReportSerializer
elif self.action in ["update", "partial_update"]:
return UpdateModerationReportSerializer
return ModerationReportSerializer
def get_permissions(self):
"""Return appropriate permissions based on action."""
if self.action == "create":
# Any authenticated user can create reports
permission_classes = [permissions.IsAuthenticated]
elif self.action in ["list", "retrieve"]:
# Moderators and above can view reports
permission_classes = [CanViewModerationData]
else:
# Only moderators and above can modify reports
permission_classes = [IsModeratorOrAdmin]
return [permission() for permission in permission_classes]
def get_queryset(self):
"""Filter queryset based on user permissions."""
queryset = super().get_queryset()
# Regular users can only see their own reports
if not self.request.user.is_authenticated:
return queryset.none()
user_role = getattr(self.request.user, "role", "USER")
if user_role == "USER":
queryset = queryset.filter(reported_by=self.request.user)
return queryset
@action(detail=True, methods=["post"], permission_classes=[IsModeratorOrAdmin])
def assign(self, request, pk=None):
"""Assign a report to a moderator."""
report = self.get_object()
moderator_id = request.data.get("moderator_id")
try:
moderator = User.objects.get(id=moderator_id)
moderator_role = getattr(moderator, "role", "USER")
if moderator_role not in ["MODERATOR", "ADMIN", "SUPERUSER"]:
return Response(
{"error": "User must be a moderator, admin, or superuser"},
status=status.HTTP_400_BAD_REQUEST,
)
report.assigned_moderator = moderator
report.status = "UNDER_REVIEW"
report.save()
serializer = self.get_serializer(report)
return Response(serializer.data)
except User.DoesNotExist:
return Response(
{"error": "Moderator not found"}, status=status.HTTP_404_NOT_FOUND
)
@action(detail=True, methods=["post"], permission_classes=[IsModeratorOrAdmin])
def resolve(self, request, pk=None):
"""Resolve a moderation report."""
report = self.get_object()
resolution_action = request.data.get("resolution_action")
resolution_notes = request.data.get("resolution_notes", "")
if not resolution_action:
return Response(
{"error": "resolution_action is required"},
status=status.HTTP_400_BAD_REQUEST,
)
report.status = "RESOLVED"
report.resolution_action = resolution_action
report.resolution_notes = resolution_notes
report.resolved_at = timezone.now()
report.save()
serializer = self.get_serializer(report)
return Response(serializer.data)
@action(detail=False, methods=["get"], permission_classes=[CanViewModerationData])
def stats(self, request):
"""Get moderation report statistics."""
queryset = self.get_queryset()
# Basic counts
total_reports = queryset.count()
pending_reports = queryset.filter(status="PENDING").count()
resolved_reports = queryset.filter(status="RESOLVED").count()
# Overdue reports (based on priority SLA)
now = timezone.now()
overdue_reports = 0
for report in queryset.filter(status__in=["PENDING", "UNDER_REVIEW"]):
sla_hours = {"URGENT": 2, "HIGH": 8, "MEDIUM": 24, "LOW": 72}
hours_since_created = (now - report.created_at).total_seconds() / 3600
if report.priority in sla_hours:
threshold = sla_hours[report.priority]
else:
raise ValueError(f"Unknown priority level: {report.priority}")
if hours_since_created > threshold:
overdue_reports += 1
# Reports by priority and type
reports_by_priority = dict(
queryset.values_list("priority").annotate(count=Count("id"))
)
reports_by_type = dict(
queryset.values_list("report_type").annotate(count=Count("id"))
)
# Average resolution time
resolved_queryset = queryset.filter(
status="RESOLVED", resolved_at__isnull=False
)
avg_resolution_time = 0
if resolved_queryset.exists():
total_time = sum(
[
(report.resolved_at - report.created_at).total_seconds() / 3600
for report in resolved_queryset
if report.resolved_at
]
)
avg_resolution_time = total_time / resolved_queryset.count()
stats_data = {
"total_reports": total_reports,
"pending_reports": pending_reports,
"resolved_reports": resolved_reports,
"overdue_reports": overdue_reports,
"reports_by_priority": reports_by_priority,
"reports_by_type": reports_by_type,
"average_resolution_time_hours": round(avg_resolution_time, 2),
}
return Response(stats_data)
# ============================================================================
# Moderation Queue ViewSet
# ============================================================================
class ModerationQueueViewSet(viewsets.ModelViewSet):
"""
ViewSet for managing moderation queue items.
Provides workflow management for moderation tasks with assignment,
completion, and progress tracking.
"""
queryset = ModerationQueue.objects.select_related(
"assigned_to", "related_report", "content_type"
).all()
serializer_class = ModerationQueueSerializer
permission_classes = [CanViewModerationData]
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_class = ModerationQueueFilter
search_fields = ["title", "description"]
ordering_fields = ["created_at", "updated_at", "priority", "status"]
ordering = ["-created_at"]
@action(detail=True, methods=["post"], permission_classes=[IsModeratorOrAdmin])
def assign(self, request, pk=None):
"""Assign a queue item to a moderator."""
queue_item = self.get_object()
serializer = AssignQueueItemSerializer(data=request.data)
if serializer.is_valid():
moderator_id = serializer.validated_data["moderator_id"]
moderator = User.objects.get(id=moderator_id)
queue_item.assigned_to = moderator
queue_item.assigned_at = timezone.now()
queue_item.status = "IN_PROGRESS"
queue_item.save()
response_serializer = self.get_serializer(queue_item)
return Response(response_serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=["post"], permission_classes=[IsModeratorOrAdmin])
def unassign(self, request, pk=None):
"""Unassign a queue item."""
queue_item = self.get_object()
queue_item.assigned_to = None
queue_item.assigned_at = None
queue_item.status = "PENDING"
queue_item.save()
serializer = self.get_serializer(queue_item)
return Response(serializer.data)
@action(detail=True, methods=["post"], permission_classes=[IsModeratorOrAdmin])
def complete(self, request, pk=None):
"""Complete a queue item."""
queue_item = self.get_object()
serializer = CompleteQueueItemSerializer(data=request.data)
if serializer.is_valid():
action_taken = serializer.validated_data["action"]
notes = serializer.validated_data.get("notes", "")
queue_item.status = "COMPLETED"
queue_item.save()
# Create moderation action if needed
if action_taken != "NO_ACTION" and queue_item.related_report:
ModerationAction.objects.create(
action_type=action_taken,
reason=f"Queue item completion: {action_taken}",
details=notes,
moderator=request.user,
target_user=queue_item.related_report.reported_by,
related_report=queue_item.related_report,
is_active=True,
)
response_serializer = self.get_serializer(queue_item)
return Response(response_serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=["get"], permission_classes=[CanViewModerationData])
def my_queue(self, request):
"""Get queue items assigned to the current user."""
queryset = self.get_queryset().filter(assigned_to=request.user)
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
# ============================================================================
# Moderation Action ViewSet
# ============================================================================
class ModerationActionViewSet(viewsets.ModelViewSet):
"""
ViewSet for managing moderation actions.
Tracks actions taken against users and content with expiration
and status management.
"""
queryset = ModerationAction.objects.select_related(
"moderator", "target_user", "related_report"
).all()
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_class = ModerationActionFilter
search_fields = ["reason", "details"]
ordering_fields = ["created_at", "expires_at", "action_type"]
ordering = ["-created_at"]
def get_serializer_class(self):
"""Return appropriate serializer based on action."""
if self.action == "create":
return CreateModerationActionSerializer
return ModerationActionSerializer
def get_permissions(self):
"""Return appropriate permissions based on action."""
if self.action == "create":
permission_classes = [IsModeratorOrAdmin]
else:
permission_classes = [CanViewModerationData]
return [permission() for permission in permission_classes]
@action(detail=True, methods=["post"], permission_classes=[IsModeratorOrAdmin])
def deactivate(self, request, pk=None):
"""Deactivate a moderation action."""
action_obj = self.get_object()
action_obj.is_active = False
action_obj.save()
serializer = self.get_serializer(action_obj)
return Response(serializer.data)
@action(detail=False, methods=["get"], permission_classes=[CanViewModerationData])
def active(self, request):
"""Get all active moderation actions."""
queryset = self.get_queryset().filter(
is_active=True, expires_at__gt=timezone.now()
)
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=["get"], permission_classes=[CanViewModerationData])
def expired(self, request):
"""Get all expired moderation actions."""
queryset = self.get_queryset().filter(
expires_at__lte=timezone.now(), is_active=True
)
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
# ============================================================================
# Bulk Operation ViewSet
# ============================================================================
class BulkOperationViewSet(viewsets.ModelViewSet):
"""
ViewSet for managing bulk operations.
Provides administrative bulk operations with progress tracking
and cancellation support.
"""
queryset = BulkOperation.objects.select_related("created_by").all()
permission_classes = [IsAdminOrSuperuser]
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_class = BulkOperationFilter
search_fields = ["description"]
ordering_fields = ["created_at", "started_at", "completed_at", "priority"]
ordering = ["-created_at"]
def get_serializer_class(self):
"""Return appropriate serializer based on action."""
if self.action == "create":
return CreateBulkOperationSerializer
return BulkOperationSerializer
@action(detail=True, methods=["post"])
def cancel(self, request, pk=None):
"""Cancel a bulk operation."""
operation = self.get_object()
if operation.status not in ["PENDING", "RUNNING"]:
return Response(
{"error": "Operation cannot be cancelled"},
status=status.HTTP_400_BAD_REQUEST,
)
if not operation.can_cancel:
return Response(
{"error": "Operation is not cancellable"},
status=status.HTTP_400_BAD_REQUEST,
)
operation.status = "CANCELLED"
operation.completed_at = timezone.now()
operation.save()
serializer = self.get_serializer(operation)
return Response(serializer.data)
@action(detail=True, methods=["post"])
def retry(self, request, pk=None):
"""Retry a failed bulk operation."""
operation = self.get_object()
if operation.status != "FAILED":
return Response(
{"error": "Only failed operations can be retried"},
status=status.HTTP_400_BAD_REQUEST,
)
# Reset operation status
operation.status = "PENDING"
operation.started_at = None
operation.completed_at = None
operation.processed_items = 0
operation.failed_items = 0
operation.results = {}
operation.save()
serializer = self.get_serializer(operation)
return Response(serializer.data)
@action(detail=True, methods=["get"])
def logs(self, request, pk=None):
"""Get logs for a bulk operation."""
operation = self.get_object()
# This would typically fetch logs from a logging system
# For now, return a placeholder response
logs = {
"logs": [
{
"timestamp": operation.created_at.isoformat(),
"level": "INFO",
"message": f"Operation {operation.id} created",
"details": operation.parameters,
}
],
"count": 1,
}
return Response(logs)
@action(detail=False, methods=["get"])
def running(self, request):
"""Get all running bulk operations."""
queryset = self.get_queryset().filter(status="RUNNING")
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
# ============================================================================
# User Moderation ViewSet
# ============================================================================
class UserModerationViewSet(viewsets.ViewSet):
"""
ViewSet for user moderation operations.
Provides user-specific moderation data, statistics, and actions.
"""
permission_classes = [IsModeratorOrAdmin]
# Default serializer for schema generation
serializer_class = UserModerationProfileSerializer
def retrieve(self, request, pk=None):
"""Get moderation profile for a specific user."""
try:
user = User.objects.get(pk=pk)
except User.DoesNotExist:
return Response(
{"error": "User not found"}, status=status.HTTP_404_NOT_FOUND
)
# Gather user moderation data
reports_made = ModerationReport.objects.filter(reported_by=user).count()
reports_against = ModerationReport.objects.filter(
reported_entity_type="user", reported_entity_id=user.id
).count()
actions_against = ModerationAction.objects.filter(target_user=user)
warnings_received = actions_against.filter(action_type="WARNING").count()
suspensions_received = actions_against.filter(
action_type="USER_SUSPENSION"
).count()
active_restrictions = actions_against.filter(
is_active=True, expires_at__gt=timezone.now()
).count()
# Risk assessment (simplified)
risk_factors = []
risk_level = "LOW"
if reports_against > 5:
risk_factors.append("Multiple reports against user")
risk_level = "MEDIUM"
if suspensions_received > 0:
risk_factors.append("Previous suspensions")
risk_level = "HIGH"
if active_restrictions > 0:
risk_factors.append("Active restrictions")
risk_level = "HIGH"
# Recent activity
recent_reports = ModerationReport.objects.filter(reported_by=user).order_by(
"-created_at"
)[:5]
recent_actions = actions_against.order_by("-created_at")[:5]
# Account status
account_status = "ACTIVE"
if getattr(user, "is_banned", False):
account_status = "BANNED"
elif active_restrictions > 0:
account_status = "RESTRICTED"
last_violation = (
actions_against.filter(
action_type__in=["WARNING", "USER_SUSPENSION", "USER_BAN"]
)
.order_by("-created_at")
.first()
)
profile_data = {
"user": {
"id": user.id,
"username": user.username,
"display_name": user.get_display_name(),
"email": user.email,
"role": getattr(user, "role", "USER"),
},
"reports_made": reports_made,
"reports_against": reports_against,
"warnings_received": warnings_received,
"suspensions_received": suspensions_received,
"active_restrictions": active_restrictions,
"risk_level": risk_level,
"risk_factors": risk_factors,
"recent_reports": ModerationReportSerializer(
recent_reports, many=True
).data,
"recent_actions": ModerationActionSerializer(
recent_actions, many=True
).data,
"account_status": account_status,
"last_violation_date": (
last_violation.created_at if last_violation else None
),
"next_review_date": None, # Would be calculated based on business rules
}
return Response(profile_data)
@action(detail=True, methods=["post"])
def moderate(self, request, pk=None):
"""Take moderation action against a user."""
try:
user = User.objects.get(pk=pk)
except User.DoesNotExist:
return Response(
{"error": "User not found"}, status=status.HTTP_404_NOT_FOUND
)
serializer = CreateModerationActionSerializer(
data=request.data, context={"request": request}
)
if serializer.is_valid():
# Override target_user_id with the user from URL
validated_data = serializer.validated_data.copy()
validated_data["target_user_id"] = user.id
action = ModerationAction.objects.create(
action_type=validated_data["action_type"],
reason=validated_data["reason"],
details=validated_data["details"],
duration_hours=validated_data.get("duration_hours"),
moderator=request.user,
target_user=user,
related_report_id=validated_data.get("related_report_id"),
is_active=True,
expires_at=(
timezone.now() + timedelta(hours=validated_data["duration_hours"])
if validated_data.get("duration_hours")
else None
),
)
response_serializer = ModerationActionSerializer(action)
return Response(response_serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=["get"])
def search(self, request):
"""Search users for moderation purposes."""
query = request.query_params.get("query", "")
role = request.query_params.get("role")
has_restrictions = request.query_params.get("has_restrictions")
queryset = User.objects.all()
if query:
queryset = queryset.filter(
Q(username__icontains=query) | Q(email__icontains=query)
)
if role:
queryset = queryset.filter(role=role)
if has_restrictions == "true":
active_action_users = ModerationAction.objects.filter(
is_active=True, expires_at__gt=timezone.now()
).values_list("target_user_id", flat=True)
queryset = queryset.filter(id__in=active_action_users)
# Paginate results
page = self.paginate_queryset(queryset)
if page is not None:
users_data = []
for user in page:
restriction_count = ModerationAction.objects.filter(
target_user=user, is_active=True, expires_at__gt=timezone.now()
).count()
users_data.append(
{
"id": user.id,
"username": user.username,
"display_name": user.get_display_name(),
"email": user.email,
"role": getattr(user, "role", "USER"),
"date_joined": user.date_joined,
"last_login": user.last_login,
"is_active": user.is_active,
"restriction_count": restriction_count,
"risk_level": "HIGH" if restriction_count > 0 else "LOW",
}
)
return self.get_paginated_response(users_data)
return Response([])
@action(detail=False, methods=["get"])
def stats(self, request):
"""Get overall user moderation statistics."""
total_actions = ModerationAction.objects.count()
active_actions = ModerationAction.objects.filter(
is_active=True, expires_at__gt=timezone.now()
).count()
expired_actions = ModerationAction.objects.filter(
expires_at__lte=timezone.now()
).count()
stats_data = {
"total_actions": total_actions,
"active_actions": active_actions,
"expired_actions": expired_actions,
}
return Response(stats_data)