mirror of
https://github.com/pacnpal/thrilltrack-explorer.git
synced 2025-12-21 17:51:14 -05:00
Add automated data retention cleanup
Implements edge function, Django tasks, and UI hooks/panels for automatic retention of old metrics, anomalies, alerts, and incidents, plus updates to query keys and monitoring dashboard to reflect data-retention workflows.
This commit is contained in:
168
django/apps/monitoring/tasks_retention.py
Normal file
168
django/apps/monitoring/tasks_retention.py
Normal file
@@ -0,0 +1,168 @@
|
||||
"""
|
||||
Celery tasks for data retention and cleanup.
|
||||
"""
|
||||
import logging
|
||||
import requests
|
||||
import os
|
||||
from celery import shared_task
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SUPABASE_URL = os.environ.get('SUPABASE_URL', 'https://api.thrillwiki.com')
|
||||
SUPABASE_SERVICE_KEY = os.environ.get('SUPABASE_SERVICE_ROLE_KEY')
|
||||
|
||||
|
||||
@shared_task(bind=True, name='monitoring.run_data_retention_cleanup')
|
||||
def run_data_retention_cleanup(self):
|
||||
"""
|
||||
Run comprehensive data retention cleanup.
|
||||
Cleans up old metrics, anomaly detections, alerts, and incidents.
|
||||
Runs daily at 3 AM.
|
||||
"""
|
||||
logger.info("Starting data retention cleanup")
|
||||
|
||||
if not SUPABASE_SERVICE_KEY:
|
||||
logger.error("SUPABASE_SERVICE_ROLE_KEY not configured")
|
||||
return {'success': False, 'error': 'Missing service key'}
|
||||
|
||||
try:
|
||||
# Call the Supabase RPC function
|
||||
headers = {
|
||||
'apikey': SUPABASE_SERVICE_KEY,
|
||||
'Authorization': f'Bearer {SUPABASE_SERVICE_KEY}',
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
f'{SUPABASE_URL}/rest/v1/rpc/run_data_retention_cleanup',
|
||||
headers=headers,
|
||||
timeout=60
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
logger.info(f"Data retention cleanup completed: {result}")
|
||||
return result
|
||||
else:
|
||||
logger.error(f"Data retention cleanup failed: {response.status_code} - {response.text}")
|
||||
return {'success': False, 'error': response.text}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in data retention cleanup: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
@shared_task(bind=True, name='monitoring.cleanup_old_metrics')
|
||||
def cleanup_old_metrics(self, retention_days: int = 30):
|
||||
"""
|
||||
Clean up old metric time series data.
|
||||
Runs daily to remove metrics older than retention period.
|
||||
"""
|
||||
logger.info(f"Cleaning up metrics older than {retention_days} days")
|
||||
|
||||
if not SUPABASE_SERVICE_KEY:
|
||||
logger.error("SUPABASE_SERVICE_ROLE_KEY not configured")
|
||||
return {'success': False, 'error': 'Missing service key'}
|
||||
|
||||
try:
|
||||
headers = {
|
||||
'apikey': SUPABASE_SERVICE_KEY,
|
||||
'Authorization': f'Bearer {SUPABASE_SERVICE_KEY}',
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
f'{SUPABASE_URL}/rest/v1/rpc/cleanup_old_metrics',
|
||||
headers=headers,
|
||||
json={'retention_days': retention_days},
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
deleted_count = response.json()
|
||||
logger.info(f"Cleaned up {deleted_count} old metrics")
|
||||
return {'success': True, 'deleted_count': deleted_count}
|
||||
else:
|
||||
logger.error(f"Metrics cleanup failed: {response.status_code} - {response.text}")
|
||||
return {'success': False, 'error': response.text}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in metrics cleanup: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
@shared_task(bind=True, name='monitoring.cleanup_old_anomalies')
|
||||
def cleanup_old_anomalies(self, retention_days: int = 30):
|
||||
"""
|
||||
Clean up old anomaly detections.
|
||||
Archives resolved anomalies and deletes very old unresolved ones.
|
||||
"""
|
||||
logger.info(f"Cleaning up anomalies older than {retention_days} days")
|
||||
|
||||
if not SUPABASE_SERVICE_KEY:
|
||||
logger.error("SUPABASE_SERVICE_ROLE_KEY not configured")
|
||||
return {'success': False, 'error': 'Missing service key'}
|
||||
|
||||
try:
|
||||
headers = {
|
||||
'apikey': SUPABASE_SERVICE_KEY,
|
||||
'Authorization': f'Bearer {SUPABASE_SERVICE_KEY}',
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
f'{SUPABASE_URL}/rest/v1/rpc/cleanup_old_anomalies',
|
||||
headers=headers,
|
||||
json={'retention_days': retention_days},
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
logger.info(f"Cleaned up anomalies: {result}")
|
||||
return {'success': True, 'result': result}
|
||||
else:
|
||||
logger.error(f"Anomalies cleanup failed: {response.status_code} - {response.text}")
|
||||
return {'success': False, 'error': response.text}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in anomalies cleanup: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
@shared_task(bind=True, name='monitoring.get_retention_stats')
|
||||
def get_retention_stats(self):
|
||||
"""
|
||||
Get current data retention statistics.
|
||||
Shows record counts and storage size for monitored tables.
|
||||
"""
|
||||
logger.info("Fetching data retention statistics")
|
||||
|
||||
if not SUPABASE_SERVICE_KEY:
|
||||
logger.error("SUPABASE_SERVICE_ROLE_KEY not configured")
|
||||
return {'success': False, 'error': 'Missing service key'}
|
||||
|
||||
try:
|
||||
headers = {
|
||||
'apikey': SUPABASE_SERVICE_KEY,
|
||||
'Authorization': f'Bearer {SUPABASE_SERVICE_KEY}',
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
|
||||
response = requests.get(
|
||||
f'{SUPABASE_URL}/rest/v1/data_retention_stats',
|
||||
headers=headers,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
stats = response.json()
|
||||
logger.info(f"Retrieved retention stats for {len(stats)} tables")
|
||||
return {'success': True, 'stats': stats}
|
||||
else:
|
||||
logger.error(f"Failed to get retention stats: {response.status_code} - {response.text}")
|
||||
return {'success': False, 'error': response.text}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting retention stats: {e}", exc_info=True)
|
||||
raise
|
||||
Reference in New Issue
Block a user