""" Server-Sent Events (SSE) endpoint for real-time moderation dashboard updates. This module provides a streaming HTTP response that broadcasts submission status changes to connected moderators in real-time. """ import json import logging import queue import threading import time from typing import Generator from django.http import StreamingHttpResponse, JsonResponse from django.views import View from django.contrib.auth.mixins import LoginRequiredMixin from rest_framework.views import APIView from rest_framework.permissions import IsAuthenticated from apps.moderation.permissions import CanViewModerationData from apps.moderation.signals import submission_status_changed logger = logging.getLogger(__name__) # Thread-safe queue for broadcasting events to all connected clients class SSEBroadcaster: """ Manages SSE connections and broadcasts events to all clients. Uses a simple subscriber pattern where each connected client gets its own queue of events to consume. """ def __init__(self): self._subscribers: list[queue.Queue] = [] self._lock = threading.Lock() def subscribe(self) -> queue.Queue: """Create a new subscriber queue and register it.""" client_queue = queue.Queue() with self._lock: self._subscribers.append(client_queue) logger.debug(f"SSE client subscribed. Total clients: {len(self._subscribers)}") return client_queue def unsubscribe(self, client_queue: queue.Queue): """Remove a subscriber queue.""" with self._lock: if client_queue in self._subscribers: self._subscribers.remove(client_queue) logger.debug(f"SSE client unsubscribed. Total clients: {len(self._subscribers)}") def broadcast(self, event_data: dict): """Send an event to all connected clients.""" with self._lock: for client_queue in self._subscribers: try: client_queue.put_nowait(event_data) except queue.Full: logger.warning("SSE client queue full, dropping event") # Global broadcaster instance sse_broadcaster = SSEBroadcaster() def handle_submission_status_changed(sender, payload, **kwargs): """ Signal handler that broadcasts submission status changes to SSE clients. Connected to the submission_status_changed signal from signals.py. """ sse_broadcaster.broadcast(payload) logger.debug(f"Broadcast SSE event: {payload.get('submission_type')}#{payload.get('submission_id')}") # Connect the signal handler submission_status_changed.connect(handle_submission_status_changed) class ModerationSSEView(APIView): """ Server-Sent Events endpoint for real-time moderation updates. Provides a streaming response that sends submission status changes as they occur. Clients should connect to this endpoint and keep the connection open to receive real-time updates. Response format (SSE): data: {"submission_id": 1, "new_status": "CLAIMED", ...} Usage: const eventSource = new EventSource('/api/moderation/sse/') eventSource.onmessage = (event) => { const data = JSON.parse(event.data) // Handle update } """ permission_classes = [IsAuthenticated, CanViewModerationData] def get(self, request): """ Establish SSE connection and stream events. Sends a heartbeat every 30 seconds to keep the connection alive. """ def event_stream() -> Generator[str, None, None]: client_queue = sse_broadcaster.subscribe() try: # Send initial connection event yield f"data: {json.dumps({'type': 'connected', 'message': 'SSE connection established'})}\n\n" while True: try: # Wait for event with timeout for heartbeat event = client_queue.get(timeout=30) yield f"data: {json.dumps(event)}\n\n" except queue.Empty: # Send heartbeat to keep connection alive yield f": heartbeat\n\n" except GeneratorExit: # Client disconnected sse_broadcaster.unsubscribe(client_queue) finally: sse_broadcaster.unsubscribe(client_queue) response = StreamingHttpResponse( event_stream(), content_type='text/event-stream' ) response['Cache-Control'] = 'no-cache' response['X-Accel-Buffering'] = 'no' # Disable nginx buffering response['Connection'] = 'keep-alive' return response class ModerationSSETestView(APIView): """ Test endpoint to manually trigger an SSE event. This is useful for testing the SSE connection without making actual state transitions. POST /api/moderation/sse/test/ { "submission_id": 1, "submission_type": "edit", "new_status": "CLAIMED", "previous_status": "PENDING" } """ permission_classes = [IsAuthenticated, CanViewModerationData] def post(self, request): """Broadcast a test event.""" test_payload = { "submission_id": request.data.get("submission_id", 999), "submission_type": request.data.get("submission_type", "edit"), "new_status": request.data.get("new_status", "CLAIMED"), "previous_status": request.data.get("previous_status", "PENDING"), "locked_by": request.user.username, "locked_at": None, "changed_by": request.user.username, "test": True, } sse_broadcaster.broadcast(test_payload) return JsonResponse({ "status": "ok", "message": f"Test event broadcast to {len(sse_broadcaster._subscribers)} clients", "payload": test_payload, }) __all__ = [ 'ModerationSSEView', 'ModerationSSETestView', 'sse_broadcaster', ]