-- Add approved_at column to submission_items table ALTER TABLE submission_items ADD COLUMN approved_at timestamp with time zone; -- Add index for analytics queries (filtered index for performance) CREATE INDEX idx_submission_items_approved_at ON submission_items(approved_at) WHERE approved_at IS NOT NULL; -- Add comment for documentation COMMENT ON COLUMN submission_items.approved_at IS 'Timestamp when this specific item was approved by a moderator. NULL for pending/rejected items.'; -- Drop existing function to update parameter signature DROP FUNCTION IF EXISTS process_approval_transaction(UUID, UUID[], UUID, UUID, TEXT, TEXT, TEXT); -- Recreate process_approval_transaction function with approved_at support CREATE OR REPLACE FUNCTION process_approval_transaction( p_submission_id UUID, p_item_ids UUID[], p_moderator_id UUID, p_submitter_id UUID, p_request_id TEXT DEFAULT NULL, p_approval_mode TEXT DEFAULT 'full', p_idempotency_key TEXT DEFAULT NULL ) RETURNS JSONB AS $$ DECLARE v_item RECORD; v_entity_id UUID; v_entity_type TEXT; v_action_type TEXT; v_item_data JSONB; v_approved_items JSONB := '[]'::JSONB; v_failed_items JSONB := '[]'::JSONB; v_submission_type TEXT; v_result JSONB; v_error_message TEXT; v_error_detail TEXT; v_start_time TIMESTAMP := clock_timestamp(); v_duration_ms INTEGER; v_rollback_triggered BOOLEAN := FALSE; v_lock_acquired BOOLEAN := FALSE; BEGIN -- Validate moderator has permission IF NOT is_moderator(p_moderator_id) THEN RAISE EXCEPTION 'User % does not have moderator privileges', p_moderator_id USING ERRCODE = 'insufficient_privilege'; END IF; -- Get submission type SELECT submission_type INTO v_submission_type FROM content_submissions WHERE id = p_submission_id; IF v_submission_type IS NULL THEN RAISE EXCEPTION 'Submission % not found', p_submission_id USING ERRCODE = 'no_data_found'; END IF; -- Acquire advisory lock IF NOT pg_try_advisory_xact_lock(hashtext(p_submission_id::TEXT)) THEN RAISE EXCEPTION 'Could not acquire lock for submission %', p_submission_id USING ERRCODE = '55P03'; END IF; v_lock_acquired := TRUE; -- Process each item FOR v_item IN SELECT si.* FROM submission_items si WHERE si.submission_id = p_submission_id AND si.id = ANY(p_item_ids) AND si.status = 'pending' ORDER BY si.order_index LOOP BEGIN v_entity_type := v_item.item_type; v_action_type := v_item.action_type; v_item_data := v_item.item_data; -- Create/update entity based on type and action IF v_action_type = 'create' THEN IF v_entity_type = 'park' THEN INSERT INTO parks (name, slug, description, location_id, operator_id, property_owner_id) SELECT v_item_data->>'name', v_item_data->>'slug', v_item_data->>'description', (v_item_data->>'location_id')::UUID, (v_item_data->>'operator_id')::UUID, (v_item_data->>'property_owner_id')::UUID RETURNING id INTO v_entity_id; ELSIF v_entity_type = 'ride' THEN INSERT INTO rides (name, slug, park_id, manufacturer_id, designer_id) SELECT v_item_data->>'name', v_item_data->>'slug', (v_item_data->>'park_id')::UUID, (v_item_data->>'manufacturer_id')::UUID, (v_item_data->>'designer_id')::UUID RETURNING id INTO v_entity_id; ELSIF v_entity_type IN ('manufacturer', 'operator', 'designer', 'property_owner') THEN INSERT INTO companies (name, slug, company_type, description) SELECT v_item_data->>'name', v_item_data->>'slug', v_entity_type, v_item_data->>'description' RETURNING id INTO v_entity_id; ELSE RAISE EXCEPTION 'Unsupported entity type: %', v_entity_type; END IF; ELSIF v_action_type = 'edit' THEN v_entity_id := (v_item_data->>'entity_id')::UUID; IF v_entity_type = 'park' THEN UPDATE parks SET name = COALESCE(v_item_data->>'name', name), description = COALESCE(v_item_data->>'description', description), location_id = COALESCE((v_item_data->>'location_id')::UUID, location_id), updated_at = now() WHERE id = v_entity_id; ELSIF v_entity_type = 'ride' THEN UPDATE rides SET name = COALESCE(v_item_data->>'name', name), description = COALESCE(v_item_data->>'description', description), updated_at = now() WHERE id = v_entity_id; ELSIF v_entity_type IN ('manufacturer', 'operator', 'designer', 'property_owner') THEN UPDATE companies SET name = COALESCE(v_item_data->>'name', name), description = COALESCE(v_item_data->>'description', description), updated_at = now() WHERE id = v_entity_id; END IF; END IF; -- Update submission item with approved status and timestamp UPDATE submission_items SET approved_entity_id = v_entity_id, status = 'approved', approved_at = now(), updated_at = now() WHERE id = v_item.id; -- Add to success list v_approved_items := v_approved_items || jsonb_build_object( 'item_id', v_item.id, 'entity_id', v_entity_id, 'entity_type', v_entity_type ); EXCEPTION WHEN OTHERS THEN GET STACKED DIAGNOSTICS v_error_message = MESSAGE_TEXT, v_error_detail = PG_EXCEPTION_DETAIL; -- Add to failed list v_failed_items := v_failed_items || jsonb_build_object( 'item_id', v_item.id, 'error', v_error_message, 'detail', v_error_detail ); -- Mark item as failed UPDATE submission_items SET status = 'flagged', rejection_reason = v_error_message, updated_at = now() WHERE id = v_item.id; END; END LOOP; -- Update submission status based on approval mode IF p_approval_mode = 'selective' THEN UPDATE content_submissions SET status = 'partially_approved', reviewed_at = now(), reviewer_id = p_moderator_id, updated_at = now() WHERE id = p_submission_id; ELSE UPDATE content_submissions SET status = 'approved', reviewed_at = now(), reviewer_id = p_moderator_id, resolved_at = now(), updated_at = now() WHERE id = p_submission_id; END IF; -- Calculate duration v_duration_ms := EXTRACT(EPOCH FROM (clock_timestamp() - v_start_time)) * 1000; -- Log metrics INSERT INTO approval_transaction_metrics ( submission_id, moderator_id, submitter_id, items_count, success, duration_ms, request_id, rollback_triggered ) VALUES ( p_submission_id, p_moderator_id, p_submitter_id, jsonb_array_length(v_approved_items), jsonb_array_length(v_failed_items) = 0, v_duration_ms, p_request_id, v_rollback_triggered ); -- Build result v_result := jsonb_build_object( 'success', TRUE, 'approved_items', v_approved_items, 'failed_items', v_failed_items, 'duration_ms', v_duration_ms ); RETURN v_result; EXCEPTION WHEN OTHERS THEN v_rollback_triggered := TRUE; GET STACKED DIAGNOSTICS v_error_message = MESSAGE_TEXT, v_error_detail = PG_EXCEPTION_DETAIL; -- Log failed transaction v_duration_ms := EXTRACT(EPOCH FROM (clock_timestamp() - v_start_time)) * 1000; INSERT INTO approval_transaction_metrics ( submission_id, moderator_id, submitter_id, items_count, success, duration_ms, error_message, error_details, request_id, rollback_triggered ) VALUES ( p_submission_id, p_moderator_id, p_submitter_id, array_length(p_item_ids, 1), FALSE, v_duration_ms, v_error_message, v_error_detail, p_request_id, v_rollback_triggered ); RAISE; END; $$ LANGUAGE plpgsql SECURITY DEFINER;