Files

618 lines
20 KiB
Python

"""
Notification views.
Provides REST API endpoints for:
- Subscriber management (legacy compatibility)
- Preference updates
- Notification triggering
- Moderator notifications
- System announcements
- User notification list and management
Note: Now using django-notifications-hq for native Django notifications.
The novu_service import provides backward compatibility.
"""
import logging
from django.contrib.auth import get_user_model
from rest_framework import status
from rest_framework.permissions import IsAdminUser, IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from apps.core.utils import capture_and_log
from .models import NotificationLog, NotificationPreference, Subscriber, SystemAnnouncement
from .serializers import (
CreateAnnouncementSerializer,
CreateSubscriberSerializer,
ModeratorReportNotificationSerializer,
ModeratorSubmissionNotificationSerializer,
NotificationPreferenceSerializer,
SystemAnnouncementSerializer,
TriggerNotificationSerializer,
UpdatePreferencesSerializer,
UpdateSubscriberSerializer,
)
from .services import novu_service, notification_service
logger = logging.getLogger(__name__)
User = get_user_model()
class CreateSubscriberView(APIView):
"""
POST /notifications/subscribers/
Create or update a Novu subscriber.
"""
permission_classes = [IsAuthenticated]
def post(self, request):
serializer = CreateSubscriberSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
subscriber_id = data["subscriber_id"]
try:
# Update or create local subscriber record
subscriber, created = Subscriber.objects.update_or_create(
user=request.user,
defaults={
"novu_subscriber_id": subscriber_id,
"first_name": data.get("first_name", ""),
"last_name": data.get("last_name", ""),
"email": data.get("email") or request.user.email,
"phone": data.get("phone", ""),
"avatar": data.get("avatar", ""),
"locale": data.get("locale", "en"),
"data": data.get("data", {}),
},
)
# Sync to Novu if configured
if novu_service.is_configured:
novu_service.create_subscriber(
subscriber_id=subscriber_id,
email=subscriber.email,
first_name=subscriber.first_name,
last_name=subscriber.last_name,
phone=subscriber.phone,
avatar=subscriber.avatar,
locale=subscriber.locale,
data=subscriber.data,
)
return Response(
{"subscriberId": subscriber_id, "created": created},
status=status.HTTP_201_CREATED if created else status.HTTP_200_OK,
)
except Exception as e:
capture_and_log(e, "Create notification subscriber", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class UpdateSubscriberView(APIView):
"""
POST /notifications/subscribers/update/
Update a Novu subscriber.
"""
permission_classes = [IsAuthenticated]
def post(self, request):
serializer = UpdateSubscriberSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
subscriber_id = data["subscriber_id"]
try:
# Update local record
subscriber = Subscriber.objects.filter(user=request.user).first()
if not subscriber:
return Response(
{"detail": "Subscriber not found"},
status=status.HTTP_404_NOT_FOUND,
)
# Update fields
for field in ["first_name", "last_name", "email", "phone", "avatar", "locale", "data"]:
if field in data:
setattr(subscriber, field, data[field])
subscriber.save()
# Sync to Novu
if novu_service.is_configured:
update_fields = {k: v for k, v in data.items() if k != "subscriber_id"}
novu_service.update_subscriber(subscriber_id, **update_fields)
return Response({"success": True})
except Exception as e:
capture_and_log(e, "Update notification subscriber", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class UpdatePreferencesView(APIView):
"""
POST /notifications/preferences/
Update notification preferences.
"""
permission_classes = [IsAuthenticated]
def post(self, request):
serializer = UpdatePreferencesSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
preferences = data["preferences"]
try:
# Update local preferences
pref, created = NotificationPreference.objects.update_or_create(
user=request.user,
defaults={
"channel_preferences": preferences.get("channelPreferences", {}),
"workflow_preferences": preferences.get("workflowPreferences", {}),
"frequency_settings": preferences.get("frequencySettings", {}),
},
)
# Sync to Novu
if novu_service.is_configured:
subscriber = Subscriber.objects.filter(user=request.user).first()
if subscriber:
novu_service.update_preferences(subscriber.novu_subscriber_id, preferences)
return Response({"success": True})
except Exception as e:
capture_and_log(e, "Update notification preferences", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
def get(self, request):
"""Get current user's notification preferences."""
try:
pref = NotificationPreference.objects.filter(user=request.user).first()
if not pref:
return Response(
{
"channelPreferences": {},
"workflowPreferences": {},
"frequencySettings": {},
}
)
return Response(NotificationPreferenceSerializer(pref).data)
except Exception as e:
capture_and_log(e, "Get notification preferences", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class TriggerNotificationView(APIView):
"""
POST /notifications/trigger/
Trigger a notification workflow.
"""
permission_classes = [IsAuthenticated]
def post(self, request):
serializer = TriggerNotificationSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
try:
# Log the notification
log = NotificationLog.objects.create(
user=request.user,
workflow_id=data["workflow_id"],
notification_type="trigger",
channel="all",
payload=data.get("payload", {}),
)
# Trigger via Novu
if novu_service.is_configured:
result = novu_service.trigger_notification(
workflow_id=data["workflow_id"],
subscriber_id=data["subscriber_id"],
payload=data.get("payload"),
overrides=data.get("overrides"),
)
log.novu_transaction_id = result.get("transactionId", "")
log.status = NotificationLog.Status.SENT
else:
log.status = NotificationLog.Status.SENT # Mock success
log.save()
return Response({"success": True, "transactionId": log.novu_transaction_id})
except Exception as e:
capture_and_log(e, "Trigger notification", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class NotifyModeratorsSubmissionView(APIView):
"""
POST /notifications/moderators/submission/
Notify moderators about a new submission.
"""
permission_classes = [IsAuthenticated]
def post(self, request):
serializer = ModeratorSubmissionNotificationSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
try:
# Log the notification
NotificationLog.objects.create(
user=request.user,
workflow_id="moderator-submission-notification",
notification_type="moderator_submission",
channel="in_app",
payload=data,
status=NotificationLog.Status.SENT,
)
# Trigger to moderator topic
if novu_service.is_configured:
novu_service.trigger_topic_notification(
workflow_id="moderator-submission-notification",
topic_key="moderators",
payload=data,
)
return Response({"success": True})
except Exception as e:
capture_and_log(e, "Notify moderators (submission)", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class NotifyModeratorsReportView(APIView):
"""
POST /notifications/moderators/report/
Notify moderators about a new report.
"""
permission_classes = [IsAuthenticated]
def post(self, request):
serializer = ModeratorReportNotificationSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
try:
# Log the notification
NotificationLog.objects.create(
user=request.user,
workflow_id="moderator-report-notification",
notification_type="moderator_report",
channel="in_app",
payload=data,
status=NotificationLog.Status.SENT,
)
# Trigger to moderator topic
if novu_service.is_configured:
novu_service.trigger_topic_notification(
workflow_id="moderator-report-notification",
topic_key="moderators",
payload=data,
)
return Response({"success": True})
except Exception as e:
capture_and_log(e, "Notify moderators (report)", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class NotifyUserSubmissionStatusView(APIView):
"""
POST /notifications/user/submission-status/
Notify a user about their submission status change.
"""
permission_classes = [IsAuthenticated]
def post(self, request):
data = request.data
try:
subscriber_id = data.get("subscriber_id") or str(request.user.id)
# Log the notification
NotificationLog.objects.create(
user=request.user,
workflow_id="submission-status-update",
notification_type="submission_status",
channel="email",
payload=data,
status=NotificationLog.Status.SENT,
)
# Trigger notification
if novu_service.is_configured:
novu_service.trigger_notification(
workflow_id="submission-status-update",
subscriber_id=subscriber_id,
payload=data,
)
return Response({"success": True})
except Exception as e:
capture_and_log(e, "Notify user submission status", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class SystemAnnouncementView(APIView):
"""
POST /notifications/system/announcement/
Send a system-wide announcement (admin only).
"""
permission_classes = [IsAdminUser]
def post(self, request):
serializer = CreateAnnouncementSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
try:
# Create announcement record
announcement = SystemAnnouncement.objects.create(
title=data["title"],
message=data["message"],
severity=data.get("severity", "info"),
action_url=data.get("action_url", ""),
created_by=request.user,
)
# Trigger to all users topic
if novu_service.is_configured:
novu_service.trigger_topic_notification(
workflow_id="system-announcement",
topic_key="users",
payload={
"title": announcement.title,
"message": announcement.message,
"severity": announcement.severity,
"actionUrl": announcement.action_url,
},
)
return Response(
{
"success": True,
"announcementId": str(announcement.id),
},
status=status.HTTP_201_CREATED,
)
except Exception as e:
capture_and_log(e, "System announcement", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class AdminAlertView(APIView):
"""
POST /notifications/admin/alert/
Send alert to admins.
"""
permission_classes = [IsAuthenticated]
def post(self, request):
data = request.data
try:
# Log the alert
NotificationLog.objects.create(
user=request.user,
workflow_id="admin-alert",
notification_type="admin_alert",
channel="email",
payload=data,
status=NotificationLog.Status.SENT,
)
# Trigger to admin topic
if novu_service.is_configured:
novu_service.trigger_topic_notification(
workflow_id="admin-alert",
topic_key="admins",
payload=data,
)
return Response({"success": True})
except Exception as e:
capture_and_log(e, "Admin alert", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class AdminCriticalErrorView(APIView):
"""
POST /notifications/admin/critical-error/
Send critical error alert to admins.
"""
permission_classes = [IsAuthenticated]
def post(self, request):
data = request.data
try:
# Log the alert
NotificationLog.objects.create(
user=request.user,
workflow_id="admin-critical-error",
notification_type="critical_error",
channel="email",
payload=data,
status=NotificationLog.Status.SENT,
)
# Trigger to admin topic with urgent priority
if novu_service.is_configured:
novu_service.trigger_topic_notification(
workflow_id="admin-critical-error",
topic_key="admins",
payload=data,
)
return Response({"success": True})
except Exception as e:
capture_and_log(e, "Admin critical error", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# ============================================================================
# Native Notification Views (django-notifications-hq)
# ============================================================================
class NotificationListView(APIView):
"""
GET /notifications/
Get list of notifications for the current user.
"""
permission_classes = [IsAuthenticated]
def get(self, request):
try:
unread_only = request.query_params.get("unread_only", "false").lower() == "true"
limit = min(int(request.query_params.get("limit", 50)), 100)
notifications = notification_service.get_user_notifications(
user=request.user,
unread_only=unread_only,
limit=limit,
)
# Serialize notifications
notification_list = []
for notif in notifications:
notification_list.append({
"id": notif.id,
"actor": str(notif.actor) if notif.actor else None,
"verb": notif.verb,
"description": notif.description or "",
"target": str(notif.target) if notif.target else None,
"actionObject": str(notif.action_object) if notif.action_object else None,
"level": notif.level,
"unread": notif.unread,
"data": notif.data or {},
"timestamp": notif.timestamp.isoformat(),
})
return Response({
"notifications": notification_list,
"unreadCount": notification_service.get_unread_count(request.user),
})
except Exception as e:
capture_and_log(e, "Get notifications", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class NotificationMarkReadView(APIView):
"""
POST /notifications/mark-read/
Mark notification(s) as read.
"""
permission_classes = [IsAuthenticated]
def post(self, request):
try:
notification_id = request.data.get("notification_id")
notification_service.mark_as_read(
user=request.user,
notification_id=notification_id,
)
return Response({
"success": True,
"unreadCount": notification_service.get_unread_count(request.user),
})
except Exception as e:
capture_and_log(e, "Mark notification read", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class NotificationUnreadCountView(APIView):
"""
GET /notifications/unread-count/
Get count of unread notifications.
"""
permission_classes = [IsAuthenticated]
def get(self, request):
try:
count = notification_service.get_unread_count(request.user)
return Response({"unreadCount": count})
except Exception as e:
capture_and_log(e, "Get unread count", source="api")
return Response(
{"detail": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)