""" Django-native notification service. This service provides a fully Django-native notification system. Supports: - In-app notifications - Email notifications (via Django email backend) - Real-time notifications (ready for Django Channels integration) """ import logging from typing import Any from django.conf import settings from django.contrib.auth import get_user_model from django.contrib.contenttypes.models import ContentType from django.core.mail import send_mail from django.db.models import QuerySet from django.template.loader import render_to_string from django.utils import timezone from django.utils.html import strip_tags from .models import Notification, NotificationLog, NotificationPreference, SystemAnnouncement logger = logging.getLogger(__name__) User = get_user_model() class NotificationService: """ Django-native notification service using django-notifications-hq. This replaces the Novu-based service with a fully Django-native approach. """ # Notification workflow types WORKFLOW_SUBMISSION_STATUS = "submission_status" WORKFLOW_MODERATION_ALERT = "moderation_alert" WORKFLOW_SYSTEM_ANNOUNCEMENT = "system_announcement" WORKFLOW_ADMIN_ALERT = "admin_alert" WORKFLOW_WELCOME = "welcome" WORKFLOW_COMMENT_REPLY = "comment_reply" WORKFLOW_MENTION = "mention" WORKFLOW_FOLLOW = "follow" def __init__(self): self.from_email = getattr( settings, "DEFAULT_FROM_EMAIL", "noreply@thrillwiki.com" ) self.site_name = getattr(settings, "SITE_NAME", "ThrillWiki") self.site_url = getattr(settings, "SITE_URL", "https://thrillwiki.com") def send_notification( self, recipient: User, actor: User | None, verb: str, action_object: Any = None, target: Any = None, description: str = "", level: str = "info", data: dict | None = None, send_email: bool = True, email_template: str | None = None, ) -> bool: """ Send a notification to a user. Args: recipient: The user to notify actor: The user who performed the action (can be None for system notifications) verb: Description of the action (e.g., "approved your submission") action_object: The object that was acted upon target: The target of the action description: Additional description text level: Notification level (info, success, warning, error) data: Additional data to store with the notification send_email: Whether to also send an email notification email_template: Template path for email (optional) Returns: True if notification was sent successfully """ try: # Check user preferences if self._is_user_opted_out(recipient): logger.debug(f"User {recipient.id} opted out of notifications") return False # Create in-app notification using our native model notification_data = { "recipient": recipient, "actor": actor, "verb": verb, "description": description, "level": level, "data": data or {}, } # Add generic foreign key for action_object if provided if action_object: notification_data["action_object_content_type"] = ContentType.objects.get_for_model(action_object) notification_data["action_object_id"] = action_object.pk # Add generic foreign key for target if provided if target: notification_data["target_content_type"] = ContentType.objects.get_for_model(target) notification_data["target_id"] = target.pk Notification.objects.create(**notification_data) # Log the notification self._log_notification( user=recipient, workflow_id=data.get("workflow_id", "general") if data else "general", notification_type=level, channel="in_app", status=NotificationLog.Status.SENT, payload=data or {}, ) # Optionally send email if send_email and self._should_send_email(recipient, data): self._send_email_notification( recipient=recipient, verb=verb, actor=actor, action_object=action_object, target=target, description=description, template=email_template, data=data, ) return True except Exception as e: logger.exception(f"Failed to send notification to {recipient.id}: {e}") self._log_notification( user=recipient, workflow_id=data.get("workflow_id", "general") if data else "general", notification_type=level, channel="in_app", status=NotificationLog.Status.FAILED, payload=data or {}, error_message=str(e), ) return False def send_to_group( self, recipients: QuerySet | list, actor: User | None, verb: str, action_object: Any = None, target: Any = None, description: str = "", level: str = "info", data: dict | None = None, send_email: bool = False, ) -> dict: """ Send a notification to multiple users. Returns: Dict with success/failure counts """ results = {"success": 0, "failed": 0, "skipped": 0} for recipient in recipients: if self._is_user_opted_out(recipient): results["skipped"] += 1 continue success = self.send_notification( recipient=recipient, actor=actor, verb=verb, action_object=action_object, target=target, description=description, level=level, data=data, send_email=send_email, ) if success: results["success"] += 1 else: results["failed"] += 1 return results def notify_moderators( self, verb: str, action_object: Any = None, description: str = "", data: dict | None = None, ) -> dict: """ Send a notification to all moderators. """ from django.contrib.auth import get_user_model User = get_user_model() # Get users with moderator permissions moderators = User.objects.filter( is_active=True, is_staff=True, # Or use a specific permission check ).exclude( novu_notification_prefs__is_opted_out=True ) return self.send_to_group( recipients=moderators, actor=None, verb=verb, action_object=action_object, description=description, level="info", data={**(data or {}), "workflow_id": self.WORKFLOW_MODERATION_ALERT}, send_email=True, ) def notify_admins( self, verb: str, description: str = "", level: str = "warning", data: dict | None = None, ) -> dict: """ Send a notification to all admins. """ admins = User.objects.filter(is_superuser=True, is_active=True) return self.send_to_group( recipients=admins, actor=None, verb=verb, description=description, level=level, data={**(data or {}), "workflow_id": self.WORKFLOW_ADMIN_ALERT}, send_email=True, ) def send_system_announcement( self, title: str, message: str, severity: str = "info", action_url: str = "", target_users: QuerySet | None = None, created_by: User | None = None, ) -> SystemAnnouncement: """ Create and broadcast a system announcement. """ # Create the announcement announcement = SystemAnnouncement.objects.create( title=title, message=message, severity=severity, action_url=action_url, created_by=created_by, is_active=True, ) # Notify users recipients = target_users or User.objects.filter(is_active=True) self.send_to_group( recipients=recipients, actor=created_by, verb=f"System announcement: {title}", action_object=announcement, description=message, level=severity, data={ "workflow_id": self.WORKFLOW_SYSTEM_ANNOUNCEMENT, "announcement_id": str(announcement.id), "action_url": action_url, }, send_email=severity in ["warning", "critical"], ) return announcement def get_user_notifications( self, user: User, unread_only: bool = False, limit: int = 50, ): """ Get notifications for a user. """ qs = Notification.objects.filter(recipient=user) if unread_only: qs = qs.unread() return qs[:limit] def mark_as_read(self, user: User, notification_id: int | None = None): """ Mark notification(s) as read. """ if notification_id: try: notification = Notification.objects.get(recipient=user, id=notification_id) notification.mark_as_read() except Notification.DoesNotExist: pass else: # Mark all as read Notification.objects.filter(recipient=user).mark_all_as_read() def get_unread_count(self, user: User) -> int: """ Get count of unread notifications. """ return Notification.objects.filter(recipient=user, unread=True).count() def _is_user_opted_out(self, user: User) -> bool: """Check if user has opted out of notifications.""" try: prefs = NotificationPreference.objects.get(user=user) return prefs.is_opted_out except NotificationPreference.DoesNotExist: return False def _should_send_email(self, user: User, data: dict | None) -> bool: """Check if email should be sent based on user preferences.""" try: prefs = NotificationPreference.objects.get(user=user) # Check channel preferences channel_prefs = prefs.channel_preferences or {} email_enabled = channel_prefs.get("email", True) if not email_enabled: return False # Check workflow-specific preferences if data and "workflow_id" in data: workflow_prefs = prefs.workflow_preferences or {} workflow_email = workflow_prefs.get(data["workflow_id"], {}).get("email", True) return workflow_email return True except NotificationPreference.DoesNotExist: # Default to sending email if no preferences set return True def _send_email_notification( self, recipient: User, verb: str, actor: User | None, action_object: Any, target: Any, description: str, template: str | None, data: dict | None, ): """Send an email notification.""" try: # Build context context = { "recipient": recipient, "actor": actor, "verb": verb, "action_object": action_object, "target": target, "description": description, "site_name": self.site_name, "site_url": self.site_url, "data": data or {}, } # Render email if template: html_content = render_to_string(template, context) text_content = strip_tags(html_content) else: # Default simple email actor_name = actor.username if actor else self.site_name subject = f"{actor_name} {verb}" text_content = description or f"{actor_name} {verb}" html_content = f"
{text_content}
" if data and data.get("action_url"): html_content += f'' subject = f"[{self.site_name}] {verb[:50]}" send_mail( subject=subject, message=text_content, from_email=self.from_email, recipient_list=[recipient.email], html_message=html_content, fail_silently=True, ) # Log email notification self._log_notification( user=recipient, workflow_id=data.get("workflow_id", "general") if data else "general", notification_type="email", channel="email", status=NotificationLog.Status.SENT, payload=data or {}, ) except Exception as e: logger.exception(f"Failed to send email to {recipient.email}: {e}") self._log_notification( user=recipient, workflow_id=data.get("workflow_id", "general") if data else "general", notification_type="email", channel="email", status=NotificationLog.Status.FAILED, payload=data or {}, error_message=str(e), ) def _log_notification( self, user: User, workflow_id: str, notification_type: str, channel: str, status: str, payload: dict, error_message: str = "", ): """Log a notification to the audit trail.""" NotificationLog.objects.create( user=user, workflow_id=workflow_id, notification_type=notification_type, channel=channel, status=status, payload=payload, error_message=error_message, ) # Singleton instance notification_service = NotificationService() # ============================================================================ # Backward compatibility - keep old NovuService interface but delegate to native # ============================================================================ class NovuServiceSync: """ Backward-compatible wrapper that delegates to the new notification service. This maintains the old API signature for existing code while using the new Django-native implementation. """ def __init__(self): self._service = notification_service @property def is_configured(self) -> bool: """Always configured since we're using Django-native system.""" return True def create_subscriber(self, subscriber_id: str, **kwargs) -> dict[str, Any]: """Create subscriber - now a no-op as django-notifications-hq uses User directly.""" logger.info(f"Subscriber creation not needed for django-notifications-hq: {subscriber_id}") return {"subscriberId": subscriber_id, "status": "native"} def update_subscriber(self, subscriber_id: str, **kwargs) -> dict[str, Any]: """Update subscriber - now a no-op.""" logger.info(f"Subscriber update not needed for django-notifications-hq: {subscriber_id}") return {"subscriberId": subscriber_id, "status": "native"} def trigger_notification( self, workflow_id: str, subscriber_id: str, payload: dict | None = None, overrides: dict | None = None, ) -> dict[str, Any]: """Trigger a notification using the new native service.""" try: user = User.objects.get(pk=subscriber_id) verb = payload.get("message", f"Notification: {workflow_id}") if payload else f"Notification: {workflow_id}" description = payload.get("description", "") if payload else "" success = self._service.send_notification( recipient=user, actor=None, verb=verb, description=description, data={**(payload or {}), "workflow_id": workflow_id}, ) return { "status": "sent" if success else "failed", "workflow_id": workflow_id, } except User.DoesNotExist: logger.error(f"User not found for notification: {subscriber_id}") return {"status": "failed", "error": "User not found"} def trigger_topic_notification( self, workflow_id: str, topic_key: str, payload: dict | None = None, ) -> dict[str, Any]: """Trigger topic notification - maps to group notification.""" logger.info(f"Topic notification: {workflow_id} -> {topic_key}") # Map topic keys to user groups if topic_key == "moderators": result = self._service.notify_moderators( verb=payload.get("message", "New moderation task") if payload else "New moderation task", data={**(payload or {}), "workflow_id": workflow_id}, ) elif topic_key == "admins": result = self._service.notify_admins( verb=payload.get("message", "Admin notification") if payload else "Admin notification", data={**(payload or {}), "workflow_id": workflow_id}, ) else: logger.warning(f"Unknown topic key: {topic_key}") result = {"success": 0, "failed": 0, "skipped": 0} return { "status": "sent", "workflow_id": workflow_id, "result": result, } def update_preferences( self, subscriber_id: str, preferences: dict[str, Any], ) -> dict[str, Any]: """Update notification preferences.""" try: user = User.objects.get(pk=subscriber_id) prefs, _ = NotificationPreference.objects.get_or_create(user=user) if "channel_preferences" in preferences: prefs.channel_preferences = preferences["channel_preferences"] if "workflow_preferences" in preferences: prefs.workflow_preferences = preferences["workflow_preferences"] if "is_opted_out" in preferences: prefs.is_opted_out = preferences["is_opted_out"] prefs.save() return {"status": "updated"} except User.DoesNotExist: return {"status": "failed", "error": "User not found"} # Keep old name for backward compatibility novu_service = NovuServiceSync()