mirror of
https://github.com/pacnpal/thrilltrack-explorer.git
synced 2025-12-20 11:31:11 -05:00
Implements edge function, Django tasks, and UI hooks/panels for automatic retention of old metrics, anomalies, alerts, and incidents, plus updates to query keys and monitoring dashboard to reflect data-retention workflows.
169 lines
5.7 KiB
Python
169 lines
5.7 KiB
Python
"""
|
|
Celery tasks for data retention and cleanup.
|
|
"""
|
|
import logging
|
|
import requests
|
|
import os
|
|
from celery import shared_task
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SUPABASE_URL = os.environ.get('SUPABASE_URL', 'https://api.thrillwiki.com')
|
|
SUPABASE_SERVICE_KEY = os.environ.get('SUPABASE_SERVICE_ROLE_KEY')
|
|
|
|
|
|
@shared_task(bind=True, name='monitoring.run_data_retention_cleanup')
|
|
def run_data_retention_cleanup(self):
|
|
"""
|
|
Run comprehensive data retention cleanup.
|
|
Cleans up old metrics, anomaly detections, alerts, and incidents.
|
|
Runs daily at 3 AM.
|
|
"""
|
|
logger.info("Starting data retention cleanup")
|
|
|
|
if not SUPABASE_SERVICE_KEY:
|
|
logger.error("SUPABASE_SERVICE_ROLE_KEY not configured")
|
|
return {'success': False, 'error': 'Missing service key'}
|
|
|
|
try:
|
|
# Call the Supabase RPC function
|
|
headers = {
|
|
'apikey': SUPABASE_SERVICE_KEY,
|
|
'Authorization': f'Bearer {SUPABASE_SERVICE_KEY}',
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
response = requests.post(
|
|
f'{SUPABASE_URL}/rest/v1/rpc/run_data_retention_cleanup',
|
|
headers=headers,
|
|
timeout=60
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
logger.info(f"Data retention cleanup completed: {result}")
|
|
return result
|
|
else:
|
|
logger.error(f"Data retention cleanup failed: {response.status_code} - {response.text}")
|
|
return {'success': False, 'error': response.text}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in data retention cleanup: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
@shared_task(bind=True, name='monitoring.cleanup_old_metrics')
|
|
def cleanup_old_metrics(self, retention_days: int = 30):
|
|
"""
|
|
Clean up old metric time series data.
|
|
Runs daily to remove metrics older than retention period.
|
|
"""
|
|
logger.info(f"Cleaning up metrics older than {retention_days} days")
|
|
|
|
if not SUPABASE_SERVICE_KEY:
|
|
logger.error("SUPABASE_SERVICE_ROLE_KEY not configured")
|
|
return {'success': False, 'error': 'Missing service key'}
|
|
|
|
try:
|
|
headers = {
|
|
'apikey': SUPABASE_SERVICE_KEY,
|
|
'Authorization': f'Bearer {SUPABASE_SERVICE_KEY}',
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
response = requests.post(
|
|
f'{SUPABASE_URL}/rest/v1/rpc/cleanup_old_metrics',
|
|
headers=headers,
|
|
json={'retention_days': retention_days},
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
deleted_count = response.json()
|
|
logger.info(f"Cleaned up {deleted_count} old metrics")
|
|
return {'success': True, 'deleted_count': deleted_count}
|
|
else:
|
|
logger.error(f"Metrics cleanup failed: {response.status_code} - {response.text}")
|
|
return {'success': False, 'error': response.text}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in metrics cleanup: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
@shared_task(bind=True, name='monitoring.cleanup_old_anomalies')
|
|
def cleanup_old_anomalies(self, retention_days: int = 30):
|
|
"""
|
|
Clean up old anomaly detections.
|
|
Archives resolved anomalies and deletes very old unresolved ones.
|
|
"""
|
|
logger.info(f"Cleaning up anomalies older than {retention_days} days")
|
|
|
|
if not SUPABASE_SERVICE_KEY:
|
|
logger.error("SUPABASE_SERVICE_ROLE_KEY not configured")
|
|
return {'success': False, 'error': 'Missing service key'}
|
|
|
|
try:
|
|
headers = {
|
|
'apikey': SUPABASE_SERVICE_KEY,
|
|
'Authorization': f'Bearer {SUPABASE_SERVICE_KEY}',
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
response = requests.post(
|
|
f'{SUPABASE_URL}/rest/v1/rpc/cleanup_old_anomalies',
|
|
headers=headers,
|
|
json={'retention_days': retention_days},
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
logger.info(f"Cleaned up anomalies: {result}")
|
|
return {'success': True, 'result': result}
|
|
else:
|
|
logger.error(f"Anomalies cleanup failed: {response.status_code} - {response.text}")
|
|
return {'success': False, 'error': response.text}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in anomalies cleanup: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
@shared_task(bind=True, name='monitoring.get_retention_stats')
|
|
def get_retention_stats(self):
|
|
"""
|
|
Get current data retention statistics.
|
|
Shows record counts and storage size for monitored tables.
|
|
"""
|
|
logger.info("Fetching data retention statistics")
|
|
|
|
if not SUPABASE_SERVICE_KEY:
|
|
logger.error("SUPABASE_SERVICE_ROLE_KEY not configured")
|
|
return {'success': False, 'error': 'Missing service key'}
|
|
|
|
try:
|
|
headers = {
|
|
'apikey': SUPABASE_SERVICE_KEY,
|
|
'Authorization': f'Bearer {SUPABASE_SERVICE_KEY}',
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
response = requests.get(
|
|
f'{SUPABASE_URL}/rest/v1/data_retention_stats',
|
|
headers=headers,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
stats = response.json()
|
|
logger.info(f"Retrieved retention stats for {len(stats)} tables")
|
|
return {'success': True, 'stats': stats}
|
|
else:
|
|
logger.error(f"Failed to get retention stats: {response.status_code} - {response.text}")
|
|
return {'success': False, 'error': response.text}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting retention stats: {e}", exc_info=True)
|
|
raise
|