Implement pipeline monitoring alerts

Approve and implement the Supabase migration for the pipeline monitoring alert system. This includes expanding alert types, adding new monitoring functions, and updating existing ones with escalating thresholds.
This commit is contained in:
gpt-engineer-app[bot]
2025-11-07 05:05:32 +00:00
parent a74b8d6e74
commit d903e96e13
7 changed files with 544 additions and 20 deletions

View File

@@ -0,0 +1,124 @@
/**
* Pipeline Health Alerts Component
*
* Displays critical pipeline alerts on the admin error monitoring dashboard.
* Shows top 10 active alerts with severity-based styling and resolution actions.
*/
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card';
import { useSystemAlerts } from '@/hooks/useSystemHealth';
import { Badge } from '@/components/ui/badge';
import { Button } from '@/components/ui/button';
import { AlertTriangle, CheckCircle, XCircle, AlertCircle } from 'lucide-react';
import { format } from 'date-fns';
import { supabase } from '@/lib/supabaseClient';
import { toast } from 'sonner';
const SEVERITY_CONFIG = {
critical: { color: 'destructive', icon: XCircle },
high: { color: 'destructive', icon: AlertCircle },
medium: { color: 'default', icon: AlertTriangle },
low: { color: 'secondary', icon: CheckCircle },
} as const;
const ALERT_TYPE_LABELS: Record<string, string> = {
failed_submissions: 'Failed Submissions',
high_ban_rate: 'High Ban Attempt Rate',
temp_ref_error: 'Temp Reference Error',
orphaned_images: 'Orphaned Images',
slow_approval: 'Slow Approvals',
submission_queue_backlog: 'Queue Backlog',
ban_attempt: 'Ban Attempt',
upload_timeout: 'Upload Timeout',
high_error_rate: 'High Error Rate',
validation_error: 'Validation Error',
stale_submissions: 'Stale Submissions',
circular_dependency: 'Circular Dependency',
};
export function PipelineHealthAlerts() {
const { data: criticalAlerts } = useSystemAlerts('critical');
const { data: highAlerts } = useSystemAlerts('high');
const { data: mediumAlerts } = useSystemAlerts('medium');
const allAlerts = [
...(criticalAlerts || []),
...(highAlerts || []),
...(mediumAlerts || [])
].slice(0, 10);
const resolveAlert = async (alertId: string) => {
const { error } = await supabase
.from('system_alerts')
.update({ resolved_at: new Date().toISOString() })
.eq('id', alertId);
if (error) {
toast.error('Failed to resolve alert');
} else {
toast.success('Alert resolved');
}
};
if (!allAlerts.length) {
return (
<Card>
<CardHeader>
<CardTitle className="flex items-center gap-2">
<CheckCircle className="w-5 h-5 text-green-500" />
Pipeline Health: All Systems Operational
</CardTitle>
</CardHeader>
<CardContent>
<p className="text-sm text-muted-foreground">No active alerts. The sacred pipeline is flowing smoothly.</p>
</CardContent>
</Card>
);
}
return (
<Card>
<CardHeader>
<CardTitle>🚨 Active Pipeline Alerts</CardTitle>
<CardDescription>
Critical issues requiring attention ({allAlerts.length} active)
</CardDescription>
</CardHeader>
<CardContent className="space-y-3">
{allAlerts.map((alert) => {
const config = SEVERITY_CONFIG[alert.severity];
const Icon = config.icon;
const label = ALERT_TYPE_LABELS[alert.alert_type] || alert.alert_type;
return (
<div
key={alert.id}
className="flex items-start justify-between p-3 border rounded-lg hover:bg-accent transition-colors"
>
<div className="flex items-start gap-3 flex-1">
<Icon className="w-5 h-5 mt-0.5 flex-shrink-0" />
<div className="flex-1 min-w-0">
<div className="flex items-center gap-2 mb-1">
<Badge variant={config.color as any}>{alert.severity.toUpperCase()}</Badge>
<span className="text-sm font-medium">{label}</span>
</div>
<p className="text-sm text-muted-foreground">{alert.message}</p>
<p className="text-xs text-muted-foreground mt-1">
{format(new Date(alert.created_at), 'PPp')}
</p>
</div>
</div>
<Button
variant="outline"
size="sm"
onClick={() => resolveAlert(alert.id)}
>
Resolve
</Button>
</div>
);
})}
</CardContent>
</Card>
);
}

View File

@@ -6311,9 +6311,19 @@ export type Database = {
} }
Returns: undefined Returns: undefined
} }
mark_orphaned_images: { Args: never; Returns: undefined } mark_orphaned_images: {
Args: never
Returns: {
details: Json
status: string
task: string
}[]
}
migrate_ride_technical_data: { Args: never; Returns: undefined } migrate_ride_technical_data: { Args: never; Returns: undefined }
migrate_user_list_items: { Args: never; Returns: undefined } migrate_user_list_items: { Args: never; Returns: undefined }
monitor_ban_attempts: { Args: never; Returns: undefined }
monitor_failed_submissions: { Args: never; Returns: undefined }
monitor_slow_approvals: { Args: never; Returns: undefined }
process_approval_transaction: { process_approval_transaction: {
Args: { Args: {
p_idempotency_key?: string p_idempotency_key?: string
@@ -6349,6 +6359,14 @@ export type Database = {
} }
Returns: string Returns: string
} }
run_pipeline_monitoring: {
Args: never
Returns: {
check_name: string
details: Json
status: string
}[]
}
run_system_maintenance: { run_system_maintenance: {
Args: never Args: never
Returns: { Returns: {

View File

@@ -438,6 +438,18 @@ async function submitCompositeCreation(
} }
if (errors.length > 0) { if (errors.length > 0) {
// Report to system alerts (non-blocking)
import('./pipelineAlerts').then(async ({ reportTempRefError }) => {
try {
const { data: { user } } = await supabase.auth.getUser();
if (user) {
await reportTempRefError(uploadedPrimary.type, errors, user.id);
}
} catch (e) {
console.warn('Failed to report temp ref error:', e);
}
});
throw new Error(`Temp reference validation failed: ${errors.join(', ')}`); throw new Error(`Temp reference validation failed: ${errors.join(', ')}`);
} }
}; };

82
src/lib/pipelineAlerts.ts Normal file
View File

@@ -0,0 +1,82 @@
/**
* Pipeline Alert Reporting
*
* Client-side utilities for reporting critical pipeline issues to system alerts.
* Non-blocking operations that enhance monitoring without disrupting user flows.
*/
import { supabase } from '@/lib/supabaseClient';
import { handleNonCriticalError } from '@/lib/errorHandler';
/**
* Report temp ref validation errors to system alerts
* Called when validateTempRefs() fails in entitySubmissionHelpers
*/
export async function reportTempRefError(
entityType: 'park' | 'ride',
errors: string[],
userId: string
): Promise<void> {
try {
await supabase.rpc('create_system_alert', {
p_alert_type: 'temp_ref_error',
p_severity: 'high',
p_message: `Temp reference validation failed for ${entityType}: ${errors.join(', ')}`,
p_metadata: {
entity_type: entityType,
errors,
user_id: userId,
timestamp: new Date().toISOString()
}
});
} catch (error) {
handleNonCriticalError(error, {
action: 'Report temp ref error to alerts'
});
}
}
/**
* Report submission queue backlog
* Called when IndexedDB queue exceeds threshold
*/
export async function reportQueueBacklog(
pendingCount: number,
userId?: string
): Promise<void> {
// Only report if backlog > 10
if (pendingCount <= 10) return;
try {
await supabase.rpc('create_system_alert', {
p_alert_type: 'submission_queue_backlog',
p_severity: pendingCount > 50 ? 'high' : 'medium',
p_message: `Submission queue backlog: ${pendingCount} pending submissions`,
p_metadata: {
pending_count: pendingCount,
user_id: userId,
timestamp: new Date().toISOString()
}
});
} catch (error) {
handleNonCriticalError(error, {
action: 'Report queue backlog to alerts'
});
}
}
/**
* Check queue status and report if needed
* Called on app startup and periodically
*/
export async function checkAndReportQueueStatus(userId?: string): Promise<void> {
try {
const { getPendingCount } = await import('./submissionQueue');
const pendingCount = await getPendingCount();
await reportQueueBacklog(pendingCount, userId);
} catch (error) {
handleNonCriticalError(error, {
action: 'Check queue status'
});
}
}

View File

@@ -12,6 +12,7 @@ import { RefreshButton } from '@/components/ui/refresh-button';
import { ErrorDetailsModal } from '@/components/admin/ErrorDetailsModal'; import { ErrorDetailsModal } from '@/components/admin/ErrorDetailsModal';
import { ApprovalFailureModal } from '@/components/admin/ApprovalFailureModal'; import { ApprovalFailureModal } from '@/components/admin/ApprovalFailureModal';
import { ErrorAnalytics } from '@/components/admin/ErrorAnalytics'; import { ErrorAnalytics } from '@/components/admin/ErrorAnalytics';
import { PipelineHealthAlerts } from '@/components/admin/PipelineHealthAlerts';
import { format } from 'date-fns'; import { format } from 'date-fns';
// Helper to calculate date threshold for filtering // Helper to calculate date threshold for filtering
@@ -180,6 +181,9 @@ export default function ErrorMonitoring() {
/> />
</div> </div>
{/* Pipeline Health Alerts */}
<PipelineHealthAlerts />
{/* Analytics Section */} {/* Analytics Section */}
<ErrorAnalytics errorSummary={errorSummary} approvalMetrics={approvalMetrics} /> <ErrorAnalytics errorSummary={errorSummary} approvalMetrics={approvalMetrics} />

View File

@@ -23,32 +23,28 @@ serve(async (req: Request) => {
); );
// Run system maintenance (orphaned image cleanup) // Run system maintenance (orphaned image cleanup)
const { data, error } = await supabase.rpc('run_system_maintenance'); const { data: maintenanceData, error: maintenanceError } = await supabase.rpc('run_system_maintenance');
if (error) { if (maintenanceError) {
edgeLogger.error('Maintenance failed', { requestId, error: error.message }); edgeLogger.error('Maintenance failed', { requestId, error: maintenanceError.message });
return new Response( } else {
JSON.stringify({ edgeLogger.info('Maintenance completed', { requestId, result: maintenanceData });
success: false,
error: error.message,
requestId
}),
{
status: 500,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
}
);
} }
edgeLogger.info('Maintenance completed successfully', { // Run pipeline monitoring checks
requestId, const { data: monitoringData, error: monitoringError } = await supabase.rpc('run_pipeline_monitoring');
result: data
}); if (monitoringError) {
edgeLogger.error('Pipeline monitoring failed', { requestId, error: monitoringError.message });
} else {
edgeLogger.info('Pipeline monitoring completed', { requestId, result: monitoringData });
}
return new Response( return new Response(
JSON.stringify({ JSON.stringify({
success: true, success: true,
result: data, maintenance: maintenanceData,
monitoring: monitoringData,
requestId requestId
}), }),
{ {

View File

@@ -0,0 +1,288 @@
-- Pipeline Monitoring Alert System Migration
-- Adds comprehensive monitoring for critical pipeline metrics
-- 1. Expand alert types to include pipeline-specific alerts
ALTER TABLE system_alerts
DROP CONSTRAINT IF EXISTS system_alerts_alert_type_check;
ALTER TABLE system_alerts
ADD CONSTRAINT system_alerts_alert_type_check CHECK (alert_type IN (
'orphaned_images',
'stale_submissions',
'circular_dependency',
'validation_error',
'ban_attempt',
'upload_timeout',
'high_error_rate',
'failed_submissions',
'temp_ref_error',
'submission_queue_backlog',
'slow_approval',
'high_ban_rate'
));
-- 2. Monitor Failed Submissions
CREATE OR REPLACE FUNCTION monitor_failed_submissions()
RETURNS void
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = public
AS $$
DECLARE
v_total_last_hour INTEGER;
v_failed_last_hour INTEGER;
v_failure_rate NUMERIC;
v_consecutive_failures INTEGER;
BEGIN
SELECT
COUNT(*),
COUNT(*) FILTER (WHERE success = false)
INTO v_total_last_hour, v_failed_last_hour
FROM approval_transaction_metrics
WHERE created_at > now() - interval '1 hour';
IF v_total_last_hour > 0 THEN
v_failure_rate := (v_failed_last_hour::NUMERIC / v_total_last_hour::NUMERIC) * 100;
IF v_failure_rate > 10 AND v_failed_last_hour >= 3 THEN
PERFORM create_system_alert(
'failed_submissions',
CASE
WHEN v_failure_rate > 50 THEN 'critical'
WHEN v_failure_rate > 25 THEN 'high'
ELSE 'medium'
END,
format('High approval failure rate: %.1f%% (%s/%s in last hour)',
v_failure_rate, v_failed_last_hour, v_total_last_hour),
jsonb_build_object(
'failure_rate', v_failure_rate,
'failed_count', v_failed_last_hour,
'total_count', v_total_last_hour,
'checked_at', now()
)
);
END IF;
END IF;
SELECT COUNT(*) INTO v_consecutive_failures
FROM (
SELECT success
FROM approval_transaction_metrics
ORDER BY created_at DESC
LIMIT 5
) recent
WHERE success = false;
IF v_consecutive_failures >= 5 THEN
PERFORM create_system_alert(
'failed_submissions',
'critical',
format('System failure: %s consecutive approval failures', v_consecutive_failures),
jsonb_build_object(
'consecutive_failures', v_consecutive_failures,
'checked_at', now()
)
);
END IF;
END;
$$;
-- 3. Monitor Ban Attempt Patterns
CREATE OR REPLACE FUNCTION monitor_ban_attempts()
RETURNS void
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = public
AS $$
DECLARE
v_attempts_last_hour INTEGER;
v_unique_users INTEGER;
BEGIN
SELECT
COUNT(*),
COUNT(DISTINCT (metadata->>'user_id')::UUID)
INTO v_attempts_last_hour, v_unique_users
FROM system_alerts
WHERE alert_type = 'ban_attempt'
AND created_at > now() - interval '1 hour';
IF v_attempts_last_hour >= 5 THEN
PERFORM create_system_alert(
'high_ban_rate',
CASE
WHEN v_attempts_last_hour > 20 THEN 'critical'
WHEN v_attempts_last_hour > 10 THEN 'high'
ELSE 'medium'
END,
format('High ban attempt rate: %s attempts from %s users in last hour',
v_attempts_last_hour, v_unique_users),
jsonb_build_object(
'attempt_count', v_attempts_last_hour,
'unique_users', v_unique_users,
'checked_at', now()
)
);
END IF;
END;
$$;
-- 4. Monitor Slow Approvals
CREATE OR REPLACE FUNCTION monitor_slow_approvals()
RETURNS void
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = public
AS $$
DECLARE
v_slow_count INTEGER;
v_avg_duration NUMERIC;
v_max_duration NUMERIC;
BEGIN
SELECT
COUNT(*),
AVG(duration_ms),
MAX(duration_ms)
INTO v_slow_count, v_avg_duration, v_max_duration
FROM approval_transaction_metrics
WHERE created_at > now() - interval '1 hour'
AND duration_ms > 30000;
IF v_slow_count >= 3 THEN
PERFORM create_system_alert(
'slow_approval',
CASE
WHEN v_max_duration > 60000 THEN 'high'
ELSE 'medium'
END,
format('Slow approval transactions detected: %s approvals >30s (avg: %sms, max: %sms)',
v_slow_count, ROUND(v_avg_duration), ROUND(v_max_duration)),
jsonb_build_object(
'slow_count', v_slow_count,
'avg_duration_ms', ROUND(v_avg_duration),
'max_duration_ms', ROUND(v_max_duration),
'checked_at', now()
)
);
END IF;
END;
$$;
-- 5. Drop and recreate mark_orphaned_images with escalating alerts
DROP FUNCTION IF EXISTS mark_orphaned_images();
CREATE OR REPLACE FUNCTION mark_orphaned_images()
RETURNS TABLE(task TEXT, status TEXT, details JSONB)
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = public
AS $$
DECLARE
v_orphaned_count INTEGER;
BEGIN
UPDATE orphaned_images
SET marked_for_deletion_at = now()
WHERE marked_for_deletion_at IS NULL
AND uploaded_at < now() - interval '24 hours'
AND NOT EXISTS (
SELECT 1 FROM parks WHERE image_id = orphaned_images.image_id
UNION ALL
SELECT 1 FROM rides WHERE image_id = orphaned_images.image_id
);
GET DIAGNOSTICS v_orphaned_count = ROW_COUNT;
SELECT COUNT(*) INTO v_orphaned_count
FROM orphaned_images
WHERE marked_for_deletion_at IS NOT NULL;
RETURN QUERY SELECT
'mark_orphaned_images'::TEXT,
'success'::TEXT,
jsonb_build_object('count', v_orphaned_count);
IF v_orphaned_count >= 500 THEN
PERFORM create_system_alert(
'orphaned_images',
'critical',
format('CRITICAL: %s orphaned images require cleanup', v_orphaned_count),
jsonb_build_object('count', v_orphaned_count)
);
ELSIF v_orphaned_count >= 100 THEN
PERFORM create_system_alert(
'orphaned_images',
'high',
format('High number of orphaned images: %s', v_orphaned_count),
jsonb_build_object('count', v_orphaned_count)
);
ELSIF v_orphaned_count >= 50 THEN
PERFORM create_system_alert(
'orphaned_images',
'medium',
format('Moderate orphaned images detected: %s', v_orphaned_count),
jsonb_build_object('count', v_orphaned_count)
);
END IF;
EXCEPTION WHEN OTHERS THEN
RETURN QUERY SELECT
'mark_orphaned_images'::TEXT,
'error'::TEXT,
jsonb_build_object('error', SQLERRM);
END;
$$;
-- 6. Master Monitoring Function
CREATE OR REPLACE FUNCTION run_pipeline_monitoring()
RETURNS TABLE(check_name TEXT, status TEXT, details JSONB)
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = public
AS $$
BEGIN
BEGIN
PERFORM monitor_failed_submissions();
RETURN QUERY SELECT
'monitor_failed_submissions'::TEXT,
'success'::TEXT,
'{}'::JSONB;
EXCEPTION WHEN OTHERS THEN
RETURN QUERY SELECT
'monitor_failed_submissions'::TEXT,
'error'::TEXT,
jsonb_build_object('error', SQLERRM);
END;
BEGIN
PERFORM monitor_ban_attempts();
RETURN QUERY SELECT
'monitor_ban_attempts'::TEXT,
'success'::TEXT,
'{}'::JSONB;
EXCEPTION WHEN OTHERS THEN
RETURN QUERY SELECT
'monitor_ban_attempts'::TEXT,
'error'::TEXT,
jsonb_build_object('error', SQLERRM);
END;
BEGIN
PERFORM monitor_slow_approvals();
RETURN QUERY SELECT
'monitor_slow_approvals'::TEXT,
'success'::TEXT,
'{}'::JSONB;
EXCEPTION WHEN OTHERS THEN
RETURN QUERY SELECT
'monitor_slow_approvals'::TEXT,
'error'::TEXT,
jsonb_build_object('error', SQLERRM);
END;
RETURN;
END;
$$;
GRANT EXECUTE ON FUNCTION run_pipeline_monitoring() TO authenticated;
GRANT EXECUTE ON FUNCTION monitor_failed_submissions() TO authenticated;
GRANT EXECUTE ON FUNCTION monitor_ban_attempts() TO authenticated;
GRANT EXECUTE ON FUNCTION monitor_slow_approvals() TO authenticated;