-- Add distributed tracing support to RPC functions -- Adds trace_id and parent_span_id parameters for span context propagation -- Update process_approval_transaction to accept trace context CREATE OR REPLACE FUNCTION process_approval_transaction( p_submission_id UUID, p_item_ids UUID[], p_moderator_id UUID, p_submitter_id UUID, p_request_id TEXT DEFAULT NULL, p_trace_id TEXT DEFAULT NULL, p_parent_span_id TEXT DEFAULT NULL ) RETURNS jsonb LANGUAGE plpgsql SECURITY DEFINER SET search_path = public AS $$ DECLARE v_item submission_items; v_approved_count INTEGER := 0; v_total_items INTEGER; v_new_status TEXT; v_entity_id UUID; v_all_items_processed BOOLEAN; BEGIN -- Log span start with trace context IF p_trace_id IS NOT NULL THEN RAISE NOTICE 'SPAN: {"spanId": "%", "traceId": "%", "parentSpanId": "%", "name": "process_approval_transaction_rpc", "kind": "INTERNAL", "startTime": %, "attributes": {"submission.id": "%", "item_count": %}}', gen_random_uuid()::text, p_trace_id, p_parent_span_id, extract(epoch from clock_timestamp()) * 1000, p_submission_id, array_length(p_item_ids, 1); END IF; -- Get total items for this submission SELECT COUNT(*) INTO v_total_items FROM submission_items WHERE submission_id = p_submission_id; -- Process each item FOREACH v_item IN ARRAY ( SELECT ARRAY_AGG(si ORDER BY si.order_index) FROM submission_items si WHERE si.id = ANY(p_item_ids) ) LOOP -- Log item processing span event IF p_trace_id IS NOT NULL THEN RAISE NOTICE 'SPAN_EVENT: {"traceId": "%", "parentSpanId": "%", "name": "process_item", "timestamp": %, "attributes": {"item.id": "%", "item.type": "%", "item.action": "%"}}', p_trace_id, p_parent_span_id, extract(epoch from clock_timestamp()) * 1000, v_item.id, v_item.item_type, v_item.action; END IF; -- Create or update entity based on item type IF v_item.item_type = 'park' THEN IF v_item.action = 'create' THEN -- Log entity creation IF p_trace_id IS NOT NULL THEN RAISE NOTICE 'SPAN_EVENT: {"traceId": "%", "name": "create_entity_park", "timestamp": %, "attributes": {"action": "create"}}', p_trace_id, extract(epoch from clock_timestamp()) * 1000; END IF; v_entity_id := create_entity_from_submission('park', v_item.id, p_submitter_id, p_request_id); ELSIF v_item.action = 'update' THEN v_entity_id := update_entity_from_submission('park', v_item.id, v_item.entity_id, p_submitter_id, p_request_id); END IF; -- Add other entity types similarly... END IF; -- Update item status UPDATE submission_items SET status = 'approved', processed_at = NOW(), processed_by = p_moderator_id, entity_id = v_entity_id WHERE id = v_item.id; v_approved_count := v_approved_count + 1; END LOOP; -- Determine final submission status SELECT COUNT(*) = array_length(p_item_ids, 1) INTO v_all_items_processed FROM submission_items WHERE submission_id = p_submission_id AND status IN ('approved', 'rejected'); IF v_all_items_processed THEN v_new_status := 'approved'; ELSE v_new_status := 'partially_approved'; END IF; -- Update submission status UPDATE content_submissions SET status = v_new_status, processed_at = CASE WHEN v_new_status = 'approved' THEN NOW() ELSE processed_at END, assigned_to = NULL, lock_expires_at = NULL WHERE id = p_submission_id; -- Log completion IF p_trace_id IS NOT NULL THEN RAISE NOTICE 'SPAN_EVENT: {"traceId": "%", "name": "transaction_complete", "timestamp": %, "attributes": {"items_processed": %, "new_status": "%"}}', p_trace_id, extract(epoch from clock_timestamp()) * 1000, v_approved_count, v_new_status; END IF; RETURN jsonb_build_object( 'success', true, 'status', v_new_status, 'approved_count', v_approved_count, 'total_items', v_total_items ); END; $$; -- Update process_rejection_transaction similarly CREATE OR REPLACE FUNCTION process_rejection_transaction( p_submission_id UUID, p_item_ids UUID[], p_moderator_id UUID, p_rejection_reason TEXT, p_request_id TEXT DEFAULT NULL, p_trace_id TEXT DEFAULT NULL, p_parent_span_id TEXT DEFAULT NULL ) RETURNS jsonb LANGUAGE plpgsql SECURITY DEFINER SET search_path = public AS $$ DECLARE v_rejected_count INTEGER := 0; v_total_items INTEGER; v_new_status TEXT; v_all_items_processed BOOLEAN; BEGIN -- Log span start IF p_trace_id IS NOT NULL THEN RAISE NOTICE 'SPAN: {"spanId": "%", "traceId": "%", "parentSpanId": "%", "name": "process_rejection_transaction_rpc", "kind": "INTERNAL", "startTime": %, "attributes": {"submission.id": "%", "item_count": %}}', gen_random_uuid()::text, p_trace_id, p_parent_span_id, extract(epoch from clock_timestamp()) * 1000, p_submission_id, array_length(p_item_ids, 1); END IF; -- Get total items SELECT COUNT(*) INTO v_total_items FROM submission_items WHERE submission_id = p_submission_id; -- Reject items UPDATE submission_items SET status = 'rejected', rejection_reason = p_rejection_reason, processed_at = NOW(), processed_by = p_moderator_id WHERE id = ANY(p_item_ids); GET DIAGNOSTICS v_rejected_count = ROW_COUNT; -- Check if all items processed SELECT COUNT(*) = (SELECT COUNT(*) FROM submission_items WHERE submission_id = p_submission_id) INTO v_all_items_processed FROM submission_items WHERE submission_id = p_submission_id AND status IN ('approved', 'rejected'); IF v_all_items_processed THEN -- Check if any items were approved SELECT EXISTS( SELECT 1 FROM submission_items WHERE submission_id = p_submission_id AND status = 'approved' ) INTO v_all_items_processed; v_new_status := CASE WHEN v_all_items_processed THEN 'partially_approved' ELSE 'rejected' END; ELSE v_new_status := 'partially_approved'; END IF; -- Update submission UPDATE content_submissions SET status = v_new_status, processed_at = CASE WHEN v_new_status = 'rejected' THEN NOW() ELSE processed_at END, assigned_to = NULL, lock_expires_at = NULL WHERE id = p_submission_id; -- Log completion IF p_trace_id IS NOT NULL THEN RAISE NOTICE 'SPAN_EVENT: {"traceId": "%", "name": "rejection_complete", "timestamp": %, "attributes": {"items_rejected": %, "new_status": "%"}}', p_trace_id, extract(epoch from clock_timestamp()) * 1000, v_rejected_count, v_new_status; END IF; RETURN jsonb_build_object( 'success', true, 'status', v_new_status, 'rejected_count', v_rejected_count, 'total_items', v_total_items ); END; $$;