mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2026-02-05 09:25:18 -05:00
feat: add security event taxonomy and optimize park queryset - Add comprehensive security_event_types ChoiceGroup with categories for authentication, MFA, password, account, session, and API key events - Include severity levels, icons, and CSS classes for each event type - Fix park queryset optimization by using select_related for OneToOne location relationship - Remove location property fields (latitude/longitude) from values() call as they are not actual DB columns - Add proper location fields (city, state, country) to values() for map display This change enhances security event tracking capabilities and resolves a queryset optimization issue where property decorators were incorrectly used in values() queries.
403 lines
13 KiB
Python
403 lines
13 KiB
Python
"""
|
|
Security Service for ThrillWiki
|
|
|
|
Provides centralized security event logging, notifications, and helper functions
|
|
for all authentication-related operations.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from django.conf import settings
|
|
from django.core.mail import send_mail
|
|
from django.template.loader import render_to_string
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_client_ip(request) -> str:
|
|
"""
|
|
Extract client IP from request, handling proxies correctly.
|
|
|
|
Args:
|
|
request: Django/DRF request object
|
|
|
|
Returns:
|
|
Client IP address as string
|
|
"""
|
|
# Check for proxy headers first
|
|
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
|
|
if x_forwarded_for:
|
|
# X-Forwarded-For can contain multiple IPs; take the first one
|
|
return x_forwarded_for.split(",")[0].strip()
|
|
|
|
# Check for Cloudflare's CF-Connecting-IP header
|
|
cf_connecting_ip = request.META.get("HTTP_CF_CONNECTING_IP")
|
|
if cf_connecting_ip:
|
|
return cf_connecting_ip
|
|
|
|
# Fallback to REMOTE_ADDR
|
|
return request.META.get("REMOTE_ADDR", "0.0.0.0")
|
|
|
|
|
|
def log_security_event(
|
|
event_type: str,
|
|
request,
|
|
user=None,
|
|
metadata: dict = None
|
|
) -> Any:
|
|
"""
|
|
Log a security event with request context.
|
|
|
|
Args:
|
|
event_type: One of SecurityLog.EventType choices
|
|
request: Django/DRF request object
|
|
user: User instance (optional for failed logins)
|
|
metadata: Additional event-specific data
|
|
|
|
Returns:
|
|
The created SecurityLog instance
|
|
"""
|
|
from apps.accounts.models import SecurityLog
|
|
|
|
try:
|
|
return SecurityLog.log_event(
|
|
event_type=event_type,
|
|
ip_address=get_client_ip(request),
|
|
user=user,
|
|
user_agent=request.META.get("HTTP_USER_AGENT", ""),
|
|
metadata=metadata or {},
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to log security event {event_type}: {e}")
|
|
return None
|
|
|
|
|
|
def log_security_event_simple(
|
|
event_type: str,
|
|
ip_address: str,
|
|
user=None,
|
|
user_agent: str = "",
|
|
metadata: dict = None
|
|
) -> Any:
|
|
"""
|
|
Log a security event without request context.
|
|
|
|
Use this when you don't have access to the request object.
|
|
|
|
Args:
|
|
event_type: One of SecurityLog.EventType choices
|
|
ip_address: Client IP address
|
|
user: User instance (optional)
|
|
user_agent: Browser user agent string
|
|
metadata: Additional event-specific data
|
|
|
|
Returns:
|
|
The created SecurityLog instance
|
|
"""
|
|
from apps.accounts.models import SecurityLog
|
|
|
|
try:
|
|
return SecurityLog.log_event(
|
|
event_type=event_type,
|
|
ip_address=ip_address,
|
|
user=user,
|
|
user_agent=user_agent,
|
|
metadata=metadata or {},
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to log security event {event_type}: {e}")
|
|
return None
|
|
|
|
|
|
# Subject line mapping for security notifications
|
|
SECURITY_NOTIFICATION_SUBJECTS = {
|
|
"mfa_enrolled": "Two-Factor Authentication Enabled",
|
|
"mfa_disabled": "Two-Factor Authentication Disabled",
|
|
"passkey_registered": "New Passkey Added to Your Account",
|
|
"passkey_removed": "Passkey Removed from Your Account",
|
|
"password_changed": "Your Password Was Changed",
|
|
"password_reset_completed": "Your Password Has Been Reset",
|
|
"social_linked": "Social Account Connected",
|
|
"social_unlinked": "Social Account Disconnected",
|
|
"session_invalidated": "Session Security Update",
|
|
"recovery_codes_regenerated": "Recovery Codes Regenerated",
|
|
}
|
|
|
|
|
|
def send_security_notification(
|
|
user,
|
|
event_type: str,
|
|
metadata: dict = None
|
|
) -> bool:
|
|
"""
|
|
Send email notification for security-sensitive events.
|
|
|
|
This function sends an email to the user when important security
|
|
events occur on their account.
|
|
|
|
Args:
|
|
user: User instance to notify
|
|
event_type: Type of security event (used to select template and subject)
|
|
metadata: Additional context for the email template
|
|
|
|
Returns:
|
|
True if email was sent successfully, False otherwise
|
|
"""
|
|
if not user or not user.email:
|
|
logger.warning(f"Cannot send security notification: no email for user")
|
|
return False
|
|
|
|
# Check if user has security notifications enabled
|
|
if hasattr(user, "notification_preference"):
|
|
prefs = user.notification_preference
|
|
if not getattr(prefs, "account_security_email", True):
|
|
logger.debug(f"User {user.username} has security emails disabled")
|
|
return False
|
|
|
|
try:
|
|
subject = f"ThrillWiki Security Alert: {SECURITY_NOTIFICATION_SUBJECTS.get(event_type, 'Account Activity')}"
|
|
|
|
context = {
|
|
"user": user,
|
|
"event_type": event_type,
|
|
"event_display": SECURITY_NOTIFICATION_SUBJECTS.get(event_type, "Account Activity"),
|
|
"metadata": metadata or {},
|
|
"site_name": "ThrillWiki",
|
|
"support_email": getattr(settings, "DEFAULT_SUPPORT_EMAIL", "support@thrillwiki.com"),
|
|
}
|
|
|
|
# Try to render HTML template, fallback to plain text
|
|
try:
|
|
html_message = render_to_string("accounts/email/security_notification.html", context)
|
|
except Exception as template_error:
|
|
logger.debug(f"HTML template not found, using fallback: {template_error}")
|
|
html_message = _get_fallback_security_email(context)
|
|
|
|
# Plain text version
|
|
text_message = _get_plain_text_security_email(context)
|
|
|
|
send_mail(
|
|
subject=subject,
|
|
message=text_message,
|
|
from_email=settings.DEFAULT_FROM_EMAIL,
|
|
recipient_list=[user.email],
|
|
html_message=html_message,
|
|
fail_silently=False,
|
|
)
|
|
|
|
logger.info(f"Security notification sent to {user.email} for event: {event_type}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send security notification to {user.email}: {e}")
|
|
return False
|
|
|
|
|
|
def _get_plain_text_security_email(context: dict) -> str:
|
|
"""Generate plain text email for security notifications."""
|
|
event_display = context.get("event_display", "Account Activity")
|
|
user = context.get("user")
|
|
metadata = context.get("metadata", {})
|
|
|
|
lines = [
|
|
f"Hello {user.get_display_name() if user else 'User'},",
|
|
"",
|
|
f"This is a security notification from ThrillWiki.",
|
|
"",
|
|
f"Event: {event_display}",
|
|
]
|
|
|
|
# Add metadata details
|
|
if metadata:
|
|
lines.append("")
|
|
lines.append("Details:")
|
|
for key, value in metadata.items():
|
|
if key not in ("user_id", "internal"):
|
|
lines.append(f" - {key.replace('_', ' ').title()}: {value}")
|
|
|
|
lines.extend([
|
|
"",
|
|
"If you did not perform this action, please secure your account immediately:",
|
|
"1. Change your password",
|
|
"2. Review your connected devices and sign out any you don't recognize",
|
|
"3. Contact support if you need assistance",
|
|
"",
|
|
"Best regards,",
|
|
"The ThrillWiki Team",
|
|
])
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _get_fallback_security_email(context: dict) -> str:
|
|
"""Generate HTML email for security notifications when template not found."""
|
|
event_display = context.get("event_display", "Account Activity")
|
|
user = context.get("user")
|
|
metadata = context.get("metadata", {})
|
|
|
|
metadata_html = ""
|
|
if metadata:
|
|
items = []
|
|
for key, value in metadata.items():
|
|
if key not in ("user_id", "internal"):
|
|
items.append(f"<li><strong>{key.replace('_', ' ').title()}:</strong> {value}</li>")
|
|
if items:
|
|
metadata_html = f"<h3>Details:</h3><ul>{''.join(items)}</ul>"
|
|
|
|
return f"""
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<style>
|
|
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; line-height: 1.6; color: #333; }}
|
|
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
|
|
.header {{ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 20px; border-radius: 8px 8px 0 0; }}
|
|
.header h1 {{ color: white; margin: 0; font-size: 24px; }}
|
|
.content {{ background: #f9f9f9; padding: 30px; border-radius: 0 0 8px 8px; }}
|
|
.alert {{ background: #fff3cd; border-left: 4px solid #ffc107; padding: 15px; margin: 20px 0; }}
|
|
.footer {{ text-align: center; color: #666; font-size: 12px; margin-top: 20px; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="container">
|
|
<div class="header">
|
|
<h1>🔒 Security Alert</h1>
|
|
</div>
|
|
<div class="content">
|
|
<p>Hello {user.get_display_name() if user else 'User'},</p>
|
|
<p>This is a security notification from ThrillWiki.</p>
|
|
<h2>{event_display}</h2>
|
|
{metadata_html}
|
|
<div class="alert">
|
|
<strong>Didn't do this?</strong><br>
|
|
If you did not perform this action, please secure your account immediately by changing your password and reviewing your connected devices.
|
|
</div>
|
|
</div>
|
|
<div class="footer">
|
|
<p>This is an automated security notification from ThrillWiki.</p>
|
|
</div>
|
|
</div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
def check_auth_method_availability(user) -> dict:
|
|
"""
|
|
Check what authentication methods a user has available.
|
|
|
|
This is used to prevent users from removing their last auth method.
|
|
|
|
Args:
|
|
user: User instance to check
|
|
|
|
Returns:
|
|
Dictionary with auth method availability:
|
|
{
|
|
"has_password": bool,
|
|
"has_totp": bool,
|
|
"has_passkey": bool,
|
|
"passkey_count": int,
|
|
"has_social": bool,
|
|
"social_providers": list[str],
|
|
"total_methods": int,
|
|
"can_remove_mfa": bool,
|
|
"can_remove_passkey": bool,
|
|
"can_remove_social": bool,
|
|
}
|
|
"""
|
|
try:
|
|
from allauth.mfa.models import Authenticator
|
|
except ImportError:
|
|
Authenticator = None
|
|
|
|
result = {
|
|
"has_password": user.has_usable_password(),
|
|
"has_totp": False,
|
|
"has_passkey": False,
|
|
"passkey_count": 0,
|
|
"has_social": False,
|
|
"social_providers": [],
|
|
"total_methods": 0,
|
|
}
|
|
|
|
# Check MFA authenticators
|
|
if Authenticator:
|
|
result["has_totp"] = Authenticator.objects.filter(
|
|
user=user, type=Authenticator.Type.TOTP
|
|
).exists()
|
|
|
|
passkey_count = Authenticator.objects.filter(
|
|
user=user, type=Authenticator.Type.WEBAUTHN
|
|
).count()
|
|
result["passkey_count"] = passkey_count
|
|
result["has_passkey"] = passkey_count > 0
|
|
|
|
# Check social accounts
|
|
if hasattr(user, "socialaccount_set"):
|
|
social_accounts = user.socialaccount_set.all()
|
|
result["has_social"] = social_accounts.exists()
|
|
result["social_providers"] = list(social_accounts.values_list("provider", flat=True))
|
|
|
|
# Calculate total methods (counting passkeys as one method regardless of count)
|
|
result["total_methods"] = sum([
|
|
result["has_password"],
|
|
result["has_passkey"],
|
|
result["has_social"],
|
|
])
|
|
|
|
# Determine what can be safely removed
|
|
# User must always have at least one primary auth method remaining
|
|
result["can_remove_mfa"] = result["total_methods"] >= 1
|
|
result["can_remove_passkey"] = (
|
|
result["total_methods"] > 1 or
|
|
(result["passkey_count"] > 1) or
|
|
result["has_password"] or
|
|
result["has_social"]
|
|
)
|
|
result["can_remove_social"] = (
|
|
result["total_methods"] > 1 or
|
|
result["has_password"] or
|
|
result["has_passkey"]
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def invalidate_user_sessions(user, exclude_current: bool = False, request=None) -> int:
|
|
"""
|
|
Invalidate all JWT tokens for a user.
|
|
|
|
This is used after security-sensitive operations like password reset.
|
|
|
|
Args:
|
|
user: User whose sessions to invalidate
|
|
exclude_current: If True and request is provided, keep current session
|
|
request: Current request (used if exclude_current is True)
|
|
|
|
Returns:
|
|
Number of tokens invalidated
|
|
"""
|
|
try:
|
|
from rest_framework_simplejwt.token_blacklist.models import (
|
|
BlacklistedToken,
|
|
OutstandingToken,
|
|
)
|
|
except ImportError:
|
|
logger.warning("JWT token blacklist not available")
|
|
return 0
|
|
|
|
count = 0
|
|
outstanding_tokens = OutstandingToken.objects.filter(user=user)
|
|
|
|
for token in outstanding_tokens:
|
|
try:
|
|
BlacklistedToken.objects.get_or_create(token=token)
|
|
count += 1
|
|
except Exception as e:
|
|
logger.debug(f"Could not blacklist token: {e}")
|
|
|
|
logger.info(f"Invalidated {count} tokens for user {user.username}")
|
|
return count
|