feat: Implement a new notifications application, add admin API views for dashboard metrics, introduce scheduled tasks, and update API routing and project configurations.

This commit is contained in:
pacnpal
2026-01-05 09:50:00 -05:00
parent 1c6e219662
commit a801813dcf
27 changed files with 3829 additions and 131 deletions

View File

@@ -3,3 +3,22 @@ Core tasks package for ThrillWiki.
This package contains all Celery tasks for the core application.
"""
from apps.core.tasks.scheduled import (
cleanup_old_versions,
cleanup_orphaned_images,
data_retention_cleanup,
process_closing_entities,
process_expired_bans,
process_scheduled_deletions,
)
__all__ = [
"process_scheduled_deletions",
"process_closing_entities",
"process_expired_bans",
"cleanup_orphaned_images",
"cleanup_old_versions",
"data_retention_cleanup",
]

View File

@@ -0,0 +1,417 @@
"""
Scheduled Celery tasks for ThrillWiki.
These tasks are run on a schedule via Celery Beat for maintenance operations.
"""
import logging
from datetime import timedelta
from celery import shared_task
from django.contrib.auth import get_user_model
from django.db import transaction
from django.utils import timezone
from apps.core.utils import capture_and_log
logger = logging.getLogger(__name__)
User = get_user_model()
@shared_task(name="core.process_scheduled_deletions")
def process_scheduled_deletions() -> dict:
"""
Process scheduled account deletions.
Users who requested account deletion and whose grace period has expired
will have their accounts permanently deleted.
Returns:
dict: Summary with counts of processed, succeeded, and failed deletions
"""
from apps.accounts.models import AccountDeletionRequest
logger.info("Starting scheduled account deletions processing")
cutoff_time = timezone.now()
processed = 0
succeeded = 0
failed = 0
failures = []
try:
# Get deletion requests that are past their scheduled time
pending_deletions = AccountDeletionRequest.objects.filter(
status="pending",
scheduled_deletion_at__lte=cutoff_time,
).select_related("user")
for request in pending_deletions:
processed += 1
try:
with transaction.atomic():
user = request.user
username = user.username
# Mark request as processing
request.status = "processing"
request.save()
# Anonymize user data (keep submissions)
user.username = f"deleted_{user.id}"
user.email = f"deleted_{user.id}@deleted.thrillwiki.com"
user.first_name = ""
user.last_name = ""
user.is_active = False
user.save()
# Mark deletion as complete
request.status = "completed"
request.completed_at = timezone.now()
request.save()
succeeded += 1
logger.info(f"Successfully processed deletion for user {username}")
except Exception as e:
failed += 1
error_msg = f"User {request.user_id}: {str(e)}"
failures.append(error_msg)
capture_and_log(e, f"Process scheduled deletion for user {request.user_id}", source="task")
except Exception as e:
capture_and_log(e, "Process scheduled deletions", source="task")
result = {
"processed": processed,
"succeeded": succeeded,
"failed": failed,
"failures": failures[:10], # Limit failure list
"timestamp": timezone.now().isoformat(),
}
logger.info(
f"Completed scheduled deletions: {processed} processed, {succeeded} succeeded, {failed} failed"
)
return result
@shared_task(name="core.process_closing_entities")
def process_closing_entities() -> dict:
"""
Process parks and rides that have reached their closing date.
Entities in CLOSING status with a closing_date in the past will be
transitioned to their post_closing_status (typically CLOSED or SBNO).
Returns:
dict: Summary with counts
"""
from apps.parks.models import Park
from apps.rides.models import Ride
logger.info("Starting closing entities processing")
today = timezone.now().date()
results = {"parks": {"processed": 0, "succeeded": 0, "failed": 0}, "rides": {"processed": 0, "succeeded": 0, "failed": 0}}
# Get system user for automated transitions
try:
system_user = User.objects.get(username="system")
except User.DoesNotExist:
system_user = User.objects.filter(is_staff=True).first()
# Process parks
try:
closing_parks = Park.objects.filter(
status="CLOSING",
closing_date__lte=today,
)
for park in closing_parks:
results["parks"]["processed"] += 1
try:
with transaction.atomic():
# Transition to closed status
park.status = getattr(park, "post_closing_status", "CLOSED") or "CLOSED"
park.save(update_fields=["status", "updated_at"])
results["parks"]["succeeded"] += 1
logger.info(f"Transitioned park {park.name} to {park.status}")
except Exception as e:
results["parks"]["failed"] += 1
capture_and_log(e, f"Process closing park {park.id}", source="task")
except Exception as e:
capture_and_log(e, "Process closing parks", source="task")
# Process rides (already handled by rides.check_overdue_closings, but included for completeness)
try:
closing_rides = Ride.objects.filter(
status="CLOSING",
closing_date__lte=today,
)
for ride in closing_rides:
results["rides"]["processed"] += 1
try:
with transaction.atomic():
if hasattr(ride, "apply_post_closing_status") and system_user:
ride.apply_post_closing_status(user=system_user)
else:
ride.status = getattr(ride, "post_closing_status", "CLOSED") or "CLOSED"
ride.save(update_fields=["status", "updated_at"])
results["rides"]["succeeded"] += 1
logger.info(f"Transitioned ride {ride.name} to {ride.status}")
except Exception as e:
results["rides"]["failed"] += 1
capture_and_log(e, f"Process closing ride {ride.id}", source="task")
except Exception as e:
capture_and_log(e, "Process closing rides", source="task")
logger.info(f"Completed closing entities: Parks {results['parks']}, Rides {results['rides']}")
return results
@shared_task(name="core.process_expired_bans")
def process_expired_bans() -> dict:
"""
Process expired user bans.
Users with temporary bans that have expired will have their ban lifted.
Returns:
dict: Summary with counts
"""
from apps.accounts.models import UserBan
logger.info("Starting expired bans processing")
now = timezone.now()
processed = 0
succeeded = 0
failed = 0
try:
expired_bans = UserBan.objects.filter(
is_active=True,
expires_at__isnull=False,
expires_at__lte=now,
).select_related("user")
for ban in expired_bans:
processed += 1
try:
with transaction.atomic():
ban.is_active = False
ban.save(update_fields=["is_active", "updated_at"])
# Reactivate user if this was their only active ban
active_bans = UserBan.objects.filter(user=ban.user, is_active=True).count()
if active_bans == 0 and not ban.user.is_active:
ban.user.is_active = True
ban.user.save(update_fields=["is_active"])
succeeded += 1
logger.info(f"Lifted expired ban for user {ban.user.username}")
except Exception as e:
failed += 1
capture_and_log(e, f"Process expired ban {ban.id}", source="task")
except Exception as e:
capture_and_log(e, "Process expired bans", source="task")
# Model may not exist yet
if "UserBan" in str(e):
logger.info("UserBan model not found, skipping expired bans processing")
return {"skipped": True, "reason": "UserBan model not found"}
result = {
"processed": processed,
"succeeded": succeeded,
"failed": failed,
"timestamp": timezone.now().isoformat(),
}
logger.info(f"Completed expired bans: {processed} processed, {succeeded} succeeded, {failed} failed")
return result
@shared_task(name="core.cleanup_orphaned_images")
def cleanup_orphaned_images() -> dict:
"""
Clean up orphaned images.
Images that are not associated with any entity and are older than the
retention period will be deleted.
Returns:
dict: Summary with counts
"""
logger.info("Starting orphaned images cleanup")
# This is a placeholder - actual implementation depends on image storage strategy
# For Cloudflare Images, we would need to:
# 1. Query all images from Cloudflare
# 2. Compare against images referenced in the database
# 3. Delete orphaned images
result = {
"processed": 0,
"deleted": 0,
"skipped": 0,
"timestamp": timezone.now().isoformat(),
"note": "Placeholder implementation - configure based on image storage",
}
logger.info("Completed orphaned images cleanup")
return result
@shared_task(name="core.cleanup_old_versions")
def cleanup_old_versions() -> dict:
"""
Clean up old entity versions from pghistory.
Keeps the most recent N versions and deletes older ones to manage
database size.
Returns:
dict: Summary with counts
"""
logger.info("Starting old versions cleanup")
# Configuration
MAX_VERSIONS_PER_ENTITY = 50
MIN_AGE_DAYS = 90 # Only delete versions older than this
deleted_count = 0
cutoff_date = timezone.now() - timedelta(days=MIN_AGE_DAYS)
try:
# pghistory stores events in pgh_* tables
# We need to identify which models have history tracking
from django.db import connection
with connection.cursor() as cursor:
# Get list of pghistory event tables
cursor.execute(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name LIKE 'pgh_%event'
"""
)
event_tables = [row[0] for row in cursor.fetchall()]
for table_name in event_tables:
try:
# Delete old versions beyond the retention limit
# This is a simplified approach - a more sophisticated one
# would keep the most recent N per entity
cursor.execute(
f"""
DELETE FROM {table_name}
WHERE pgh_created_at < %s
AND pgh_id NOT IN (
SELECT pgh_id FROM (
SELECT pgh_id,
ROW_NUMBER() OVER (PARTITION BY pgh_obj_id ORDER BY pgh_created_at DESC) as rn
FROM {table_name}
) ranked
WHERE rn <= %s
)
""",
[cutoff_date, MAX_VERSIONS_PER_ENTITY],
)
deleted_in_table = cursor.rowcount
deleted_count += deleted_in_table
if deleted_in_table > 0:
logger.info(f"Deleted {deleted_in_table} old versions from {table_name}")
except Exception as e:
logger.warning(f"Error cleaning up {table_name}: {e}")
except Exception as e:
capture_and_log(e, "Cleanup old versions", source="task")
result = {
"deleted": deleted_count,
"cutoff_date": cutoff_date.isoformat(),
"max_versions_per_entity": MAX_VERSIONS_PER_ENTITY,
"timestamp": timezone.now().isoformat(),
}
logger.info(f"Completed old versions cleanup: {deleted_count} versions deleted")
return result
@shared_task(name="core.data_retention_cleanup")
def data_retention_cleanup() -> dict:
"""
Clean up data per retention policy (GDPR compliance).
Handles:
- Session cleanup
- Expired token cleanup
- Old audit log cleanup
- Temporary data cleanup
Returns:
dict: Summary with counts
"""
logger.info("Starting data retention cleanup")
results = {
"sessions": 0,
"tokens": 0,
"audit_logs": 0,
"temp_data": 0,
}
try:
from django.contrib.sessions.models import Session
# Clean up expired sessions
expired_sessions = Session.objects.filter(expire_date__lt=timezone.now())
results["sessions"] = expired_sessions.count()
expired_sessions.delete()
logger.info(f"Deleted {results['sessions']} expired sessions")
except Exception as e:
logger.warning(f"Session cleanup error: {e}")
try:
from rest_framework_simplejwt.token_blacklist.models import OutstandingToken
# Clean up expired tokens (older than 30 days)
cutoff = timezone.now() - timedelta(days=30)
expired_tokens = OutstandingToken.objects.filter(expires_at__lt=cutoff)
results["tokens"] = expired_tokens.count()
expired_tokens.delete()
logger.info(f"Deleted {results['tokens']} expired tokens")
except Exception as e:
logger.warning(f"Token cleanup error: {e}")
try:
from apps.accounts.models import ProfileAuditLog
# Clean up old audit logs (older than 1 year)
cutoff = timezone.now() - timedelta(days=365)
old_logs = ProfileAuditLog.objects.filter(created_at__lt=cutoff)
results["audit_logs"] = old_logs.count()
old_logs.delete()
logger.info(f"Deleted {results['audit_logs']} old audit logs")
except Exception as e:
logger.warning(f"Audit log cleanup error: {e}")
result = {
**results,
"timestamp": timezone.now().isoformat(),
}
logger.info(f"Completed data retention cleanup: {result}")
return result