-- ============================================================================ -- HIGH PRIORITY: Pipeline Cleanup Jobs & Deadlock Prevention -- ============================================================================ -- This migration adds critical cleanup functions for: -- 1. Orphaned Cloudflare images -- 2. Approved temp refs -- 3. Expired submission locks -- Plus automated pg_cron schedules -- ============================================================================ -- ============================================================================ -- CLEANUP FUNCTION #1: Orphaned Images -- ============================================================================ -- Finds Cloudflare images not referenced by any entity or submission -- Logs them for manual cleanup (can't delete from Cloudflare via SQL) -- ============================================================================ CREATE TABLE IF NOT EXISTS orphaned_images_log ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), cloudflare_image_id TEXT NOT NULL, cloudflare_image_url TEXT, image_source TEXT, -- 'submission' or 'entity' last_referenced_at TIMESTAMPTZ, detected_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), cleaned_up BOOLEAN DEFAULT FALSE, cleaned_up_at TIMESTAMPTZ, notes TEXT ); COMMENT ON TABLE orphaned_images_log IS 'Tracks Cloudflare images that are orphaned and need cleanup'; CREATE INDEX IF NOT EXISTS idx_orphaned_images_log_cleanup ON orphaned_images_log(cleaned_up, detected_at); -- Function to detect orphaned images CREATE OR REPLACE FUNCTION detect_orphaned_images() RETURNS INTEGER AS $$ DECLARE v_orphan_count INTEGER := 0; v_image_record RECORD; BEGIN -- Find images in photo_submission_items not referenced by approved submissions FOR v_image_record IN SELECT DISTINCT psi.cloudflare_image_id, psi.cloudflare_image_url, psi.created_at FROM photo_submission_items psi LEFT JOIN photo_submissions ps ON ps.id = psi.photo_submission_id LEFT JOIN content_submissions cs ON cs.id = ps.submission_id WHERE (cs.status NOT IN ('approved', 'pending') OR cs.status IS NULL) AND psi.created_at < NOW() - INTERVAL '7 days' AND NOT EXISTS ( -- Check if image is referenced by any approved entity SELECT 1 FROM parks p WHERE p.card_image_id = psi.cloudflare_image_id OR p.banner_image_id = psi.cloudflare_image_id UNION ALL SELECT 1 FROM rides r WHERE r.card_image_id = psi.cloudflare_image_id OR r.banner_image_id = psi.cloudflare_image_id UNION ALL SELECT 1 FROM companies c WHERE c.card_image_id = psi.cloudflare_image_id OR c.banner_image_id = psi.cloudflare_image_id ) LOOP -- Insert into orphaned_images_log if not already logged INSERT INTO orphaned_images_log ( cloudflare_image_id, cloudflare_image_url, image_source, last_referenced_at ) VALUES ( v_image_record.cloudflare_image_id, v_image_record.cloudflare_image_url, 'submission', v_image_record.created_at ) ON CONFLICT DO NOTHING; v_orphan_count := v_orphan_count + 1; END LOOP; RETURN v_orphan_count; END; $$ LANGUAGE plpgsql SECURITY DEFINER; -- Make cloudflare_image_id unique when not cleaned up CREATE UNIQUE INDEX IF NOT EXISTS idx_orphaned_images_unique ON orphaned_images_log(cloudflare_image_id) WHERE NOT cleaned_up; COMMENT ON FUNCTION detect_orphaned_images IS 'Detects Cloudflare images not referenced by approved entities (older than 7 days)'; -- ============================================================================ -- CLEANUP FUNCTION #2: Approved Temp Refs -- ============================================================================ -- Deletes temporary references for approved submissions older than 7 days -- ============================================================================ CREATE OR REPLACE FUNCTION cleanup_approved_temp_refs() RETURNS INTEGER AS $$ DECLARE v_deleted_count INTEGER; BEGIN -- Delete temp refs for approved items older than 7 days WITH deleted AS ( DELETE FROM submission_item_temp_refs WHERE submission_item_id IN ( SELECT id FROM submission_items WHERE status = 'approved' AND updated_at < NOW() - INTERVAL '7 days' ) RETURNING * ) SELECT COUNT(*) INTO v_deleted_count FROM deleted; -- Also delete temp refs for rejected/cancelled submissions older than 30 days WITH deleted_old AS ( DELETE FROM submission_item_temp_refs WHERE submission_item_id IN ( SELECT si.id FROM submission_items si JOIN content_submissions cs ON cs.id = si.submission_id WHERE cs.status IN ('rejected', 'cancelled') AND cs.updated_at < NOW() - INTERVAL '30 days' ) RETURNING * ) SELECT v_deleted_count + COUNT(*) INTO v_deleted_count FROM deleted_old; RETURN v_deleted_count; END; $$ LANGUAGE plpgsql SECURITY DEFINER; COMMENT ON FUNCTION cleanup_approved_temp_refs IS 'Removes temporary reference records for approved items (7d+) and rejected submissions (30d+)'; -- ============================================================================ -- CLEANUP FUNCTION #3: Expired Locks -- ============================================================================ -- Clears submission locks that have expired -- ============================================================================ CREATE OR REPLACE FUNCTION cleanup_expired_locks() RETURNS INTEGER AS $$ DECLARE v_cleared_count INTEGER; BEGIN -- Clear expired locks on content_submissions WITH cleared AS ( UPDATE content_submissions SET assigned_to = NULL, locked_until = NULL, assigned_at = NULL WHERE locked_until IS NOT NULL AND locked_until < NOW() AND status = 'pending' RETURNING * ) SELECT COUNT(*) INTO v_cleared_count FROM cleared; RETURN v_cleared_count; END; $$ LANGUAGE plpgsql SECURITY DEFINER; COMMENT ON FUNCTION cleanup_expired_locks IS 'Removes expired locks from pending submissions'; -- ============================================================================ -- CLEANUP STATISTICS VIEW -- ============================================================================ -- Provides visibility into cleanup job performance -- ============================================================================ CREATE OR REPLACE VIEW pipeline_cleanup_stats AS SELECT 'orphaned_images' AS cleanup_type, COUNT(*) FILTER (WHERE NOT cleaned_up) AS pending_count, COUNT(*) FILTER (WHERE cleaned_up) AS cleaned_count, MAX(detected_at) FILTER (WHERE NOT cleaned_up) AS last_detected, MAX(cleaned_up_at) AS last_cleaned FROM orphaned_images_log UNION ALL SELECT 'temp_refs' AS cleanup_type, COUNT(*) AS pending_count, 0 AS cleaned_count, MAX(created_at) AS last_detected, NULL AS last_cleaned FROM submission_item_temp_refs WHERE submission_item_id IN ( SELECT id FROM submission_items WHERE status = 'approved' ) UNION ALL SELECT 'expired_locks' AS cleanup_type, COUNT(*) AS pending_count, 0 AS cleaned_count, MAX(locked_until) AS last_detected, NULL AS last_cleaned FROM content_submissions WHERE locked_until IS NOT NULL AND locked_until < NOW() AND status = 'pending'; COMMENT ON VIEW pipeline_cleanup_stats IS 'Summary statistics for pipeline cleanup jobs'; -- Grant access to moderators for monitoring GRANT SELECT ON pipeline_cleanup_stats TO authenticated; -- ============================================================================ -- RLS POLICY: Allow moderators to view orphaned images -- ============================================================================ ALTER TABLE orphaned_images_log ENABLE ROW LEVEL SECURITY; CREATE POLICY moderators_view_orphaned_images ON orphaned_images_log FOR SELECT TO authenticated USING ( is_moderator(auth.uid()) ); CREATE POLICY superusers_manage_orphaned_images ON orphaned_images_log FOR ALL TO authenticated USING ( is_superuser(auth.uid()) AND has_aal2() ) WITH CHECK ( is_superuser(auth.uid()) AND has_aal2() ); -- ============================================================================ -- PG_CRON SCHEDULES -- ============================================================================ -- Schedule cleanup jobs to run automatically -- ============================================================================ -- Enable pg_cron extension (idempotent) CREATE EXTENSION IF NOT EXISTS pg_cron; -- Schedule orphaned image detection (daily at 3 AM) SELECT cron.schedule( 'detect-orphaned-images', '0 3 * * *', $$SELECT detect_orphaned_images();$$ ); -- Schedule temp refs cleanup (daily at 2 AM) SELECT cron.schedule( 'cleanup-temp-refs', '0 2 * * *', $$SELECT cleanup_approved_temp_refs();$$ ); -- Schedule lock cleanup (every 5 minutes) SELECT cron.schedule( 'cleanup-expired-locks', '*/5 * * * *', $$SELECT cleanup_expired_locks();$$ ); -- ============================================================================ -- MONITORING: Cleanup job execution log -- ============================================================================ CREATE TABLE IF NOT EXISTS cleanup_job_log ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), job_name TEXT NOT NULL, executed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), items_processed INTEGER NOT NULL DEFAULT 0, duration_ms INTEGER, success BOOLEAN NOT NULL DEFAULT TRUE, error_message TEXT ); CREATE INDEX IF NOT EXISTS idx_cleanup_job_log_executed ON cleanup_job_log(job_name, executed_at DESC); COMMENT ON TABLE cleanup_job_log IS 'Execution log for automated cleanup jobs'; -- Wrapper functions that log execution CREATE OR REPLACE FUNCTION detect_orphaned_images_with_logging() RETURNS VOID AS $$ DECLARE v_start_time TIMESTAMPTZ := NOW(); v_count INTEGER; v_duration INTEGER; v_error TEXT; BEGIN v_count := detect_orphaned_images(); v_duration := EXTRACT(EPOCH FROM (NOW() - v_start_time)) * 1000; INSERT INTO cleanup_job_log (job_name, items_processed, duration_ms) VALUES ('detect_orphaned_images', v_count, v_duration); EXCEPTION WHEN OTHERS THEN v_error := SQLERRM; INSERT INTO cleanup_job_log (job_name, items_processed, success, error_message) VALUES ('detect_orphaned_images', 0, FALSE, v_error); RAISE; END; $$ LANGUAGE plpgsql SECURITY DEFINER; CREATE OR REPLACE FUNCTION cleanup_approved_temp_refs_with_logging() RETURNS VOID AS $$ DECLARE v_start_time TIMESTAMPTZ := NOW(); v_count INTEGER; v_duration INTEGER; v_error TEXT; BEGIN v_count := cleanup_approved_temp_refs(); v_duration := EXTRACT(EPOCH FROM (NOW() - v_start_time)) * 1000; INSERT INTO cleanup_job_log (job_name, items_processed, duration_ms) VALUES ('cleanup_approved_temp_refs', v_count, v_duration); EXCEPTION WHEN OTHERS THEN v_error := SQLERRM; INSERT INTO cleanup_job_log (job_name, items_processed, success, error_message) VALUES ('cleanup_approved_temp_refs', 0, FALSE, v_error); RAISE; END; $$ LANGUAGE plpgsql SECURITY DEFINER; CREATE OR REPLACE FUNCTION cleanup_expired_locks_with_logging() RETURNS VOID AS $$ DECLARE v_start_time TIMESTAMPTZ := NOW(); v_count INTEGER; v_duration INTEGER; v_error TEXT; BEGIN v_count := cleanup_expired_locks(); v_duration := EXTRACT(EPOCH FROM (NOW() - v_start_time)) * 1000; INSERT INTO cleanup_job_log (job_name, items_processed, duration_ms) VALUES ('cleanup_expired_locks', v_count, v_duration); EXCEPTION WHEN OTHERS THEN v_error := SQLERRM; INSERT INTO cleanup_job_log (job_name, items_processed, success, error_message) VALUES ('cleanup_expired_locks', 0, FALSE, v_error); RAISE; END; $$ LANGUAGE plpgsql SECURITY DEFINER; -- Update cron schedules to use logging wrappers SELECT cron.unschedule('detect-orphaned-images'); SELECT cron.unschedule('cleanup-temp-refs'); SELECT cron.unschedule('cleanup-expired-locks'); SELECT cron.schedule( 'detect-orphaned-images', '0 3 * * *', $$SELECT detect_orphaned_images_with_logging();$$ ); SELECT cron.schedule( 'cleanup-temp-refs', '0 2 * * *', $$SELECT cleanup_approved_temp_refs_with_logging();$$ ); SELECT cron.schedule( 'cleanup-expired-locks', '*/5 * * * *', $$SELECT cleanup_expired_locks_with_logging();$$ ); -- Grant access to view job logs ALTER TABLE cleanup_job_log ENABLE ROW LEVEL SECURITY; CREATE POLICY moderators_view_cleanup_logs ON cleanup_job_log FOR SELECT TO authenticated USING ( is_moderator(auth.uid()) );