Files
thrilltrack-explorer/django/apps/monitoring/tasks_retention.py
gpt-engineer-app[bot] 915a9fe2df Add automated data retention cleanup
Implements edge function, Django tasks, and UI hooks/panels for automatic retention of old metrics, anomalies, alerts, and incidents, plus updates to query keys and monitoring dashboard to reflect data-retention workflows.
2025-11-11 02:21:27 +00:00

169 lines
5.7 KiB
Python

"""
Celery tasks for data retention and cleanup.
"""
import logging
import requests
import os
from celery import shared_task
logger = logging.getLogger(__name__)
SUPABASE_URL = os.environ.get('SUPABASE_URL', 'https://api.thrillwiki.com')
SUPABASE_SERVICE_KEY = os.environ.get('SUPABASE_SERVICE_ROLE_KEY')
@shared_task(bind=True, name='monitoring.run_data_retention_cleanup')
def run_data_retention_cleanup(self):
"""
Run comprehensive data retention cleanup.
Cleans up old metrics, anomaly detections, alerts, and incidents.
Runs daily at 3 AM.
"""
logger.info("Starting data retention cleanup")
if not SUPABASE_SERVICE_KEY:
logger.error("SUPABASE_SERVICE_ROLE_KEY not configured")
return {'success': False, 'error': 'Missing service key'}
try:
# Call the Supabase RPC function
headers = {
'apikey': SUPABASE_SERVICE_KEY,
'Authorization': f'Bearer {SUPABASE_SERVICE_KEY}',
'Content-Type': 'application/json',
}
response = requests.post(
f'{SUPABASE_URL}/rest/v1/rpc/run_data_retention_cleanup',
headers=headers,
timeout=60
)
if response.status_code == 200:
result = response.json()
logger.info(f"Data retention cleanup completed: {result}")
return result
else:
logger.error(f"Data retention cleanup failed: {response.status_code} - {response.text}")
return {'success': False, 'error': response.text}
except Exception as e:
logger.error(f"Error in data retention cleanup: {e}", exc_info=True)
raise
@shared_task(bind=True, name='monitoring.cleanup_old_metrics')
def cleanup_old_metrics(self, retention_days: int = 30):
"""
Clean up old metric time series data.
Runs daily to remove metrics older than retention period.
"""
logger.info(f"Cleaning up metrics older than {retention_days} days")
if not SUPABASE_SERVICE_KEY:
logger.error("SUPABASE_SERVICE_ROLE_KEY not configured")
return {'success': False, 'error': 'Missing service key'}
try:
headers = {
'apikey': SUPABASE_SERVICE_KEY,
'Authorization': f'Bearer {SUPABASE_SERVICE_KEY}',
'Content-Type': 'application/json',
}
response = requests.post(
f'{SUPABASE_URL}/rest/v1/rpc/cleanup_old_metrics',
headers=headers,
json={'retention_days': retention_days},
timeout=30
)
if response.status_code == 200:
deleted_count = response.json()
logger.info(f"Cleaned up {deleted_count} old metrics")
return {'success': True, 'deleted_count': deleted_count}
else:
logger.error(f"Metrics cleanup failed: {response.status_code} - {response.text}")
return {'success': False, 'error': response.text}
except Exception as e:
logger.error(f"Error in metrics cleanup: {e}", exc_info=True)
raise
@shared_task(bind=True, name='monitoring.cleanup_old_anomalies')
def cleanup_old_anomalies(self, retention_days: int = 30):
"""
Clean up old anomaly detections.
Archives resolved anomalies and deletes very old unresolved ones.
"""
logger.info(f"Cleaning up anomalies older than {retention_days} days")
if not SUPABASE_SERVICE_KEY:
logger.error("SUPABASE_SERVICE_ROLE_KEY not configured")
return {'success': False, 'error': 'Missing service key'}
try:
headers = {
'apikey': SUPABASE_SERVICE_KEY,
'Authorization': f'Bearer {SUPABASE_SERVICE_KEY}',
'Content-Type': 'application/json',
}
response = requests.post(
f'{SUPABASE_URL}/rest/v1/rpc/cleanup_old_anomalies',
headers=headers,
json={'retention_days': retention_days},
timeout=30
)
if response.status_code == 200:
result = response.json()
logger.info(f"Cleaned up anomalies: {result}")
return {'success': True, 'result': result}
else:
logger.error(f"Anomalies cleanup failed: {response.status_code} - {response.text}")
return {'success': False, 'error': response.text}
except Exception as e:
logger.error(f"Error in anomalies cleanup: {e}", exc_info=True)
raise
@shared_task(bind=True, name='monitoring.get_retention_stats')
def get_retention_stats(self):
"""
Get current data retention statistics.
Shows record counts and storage size for monitored tables.
"""
logger.info("Fetching data retention statistics")
if not SUPABASE_SERVICE_KEY:
logger.error("SUPABASE_SERVICE_ROLE_KEY not configured")
return {'success': False, 'error': 'Missing service key'}
try:
headers = {
'apikey': SUPABASE_SERVICE_KEY,
'Authorization': f'Bearer {SUPABASE_SERVICE_KEY}',
'Content-Type': 'application/json',
}
response = requests.get(
f'{SUPABASE_URL}/rest/v1/data_retention_stats',
headers=headers,
timeout=10
)
if response.status_code == 200:
stats = response.json()
logger.info(f"Retrieved retention stats for {len(stats)} tables")
return {'success': True, 'stats': stats}
else:
logger.error(f"Failed to get retention stats: {response.status_code} - {response.text}")
return {'success': False, 'error': response.text}
except Exception as e:
logger.error(f"Error getting retention stats: {e}", exc_info=True)
raise