mirror of
https://github.com/pacnpal/thrilltrack-explorer.git
synced 2025-12-26 21:47:01 -05:00
Merge branch 'dev' into main
This commit is contained in:
@@ -47,6 +47,9 @@ verify_jwt = true
|
||||
[functions.process-selective-approval]
|
||||
verify_jwt = false
|
||||
|
||||
[functions.process-selective-rejection]
|
||||
verify_jwt = false
|
||||
|
||||
[functions.send-escalation-notification]
|
||||
verify_jwt = true
|
||||
|
||||
|
||||
@@ -14,7 +14,39 @@ interface LogContext {
|
||||
[key: string]: unknown;
|
||||
}
|
||||
|
||||
// Request tracking utilities
|
||||
// Span types for distributed tracing
|
||||
export interface Span {
|
||||
spanId: string;
|
||||
traceId: string;
|
||||
parentSpanId?: string;
|
||||
name: string;
|
||||
kind: 'SERVER' | 'CLIENT' | 'INTERNAL' | 'DATABASE';
|
||||
startTime: number;
|
||||
endTime?: number;
|
||||
duration?: number;
|
||||
attributes: Record<string, unknown>;
|
||||
events: SpanEvent[];
|
||||
status: 'ok' | 'error' | 'unset';
|
||||
error?: {
|
||||
type: string;
|
||||
message: string;
|
||||
stack?: string;
|
||||
};
|
||||
}
|
||||
|
||||
export interface SpanEvent {
|
||||
timestamp: number;
|
||||
name: string;
|
||||
attributes?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface SpanContext {
|
||||
traceId: string;
|
||||
spanId: string;
|
||||
traceFlags?: number;
|
||||
}
|
||||
|
||||
// Request tracking utilities (legacy - use spans instead)
|
||||
export interface RequestTracking {
|
||||
requestId: string;
|
||||
start: number;
|
||||
@@ -33,6 +65,134 @@ export function endRequest(tracking: RequestTracking): number {
|
||||
return Date.now() - tracking.start;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Span Lifecycle Functions
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Start a new span
|
||||
*/
|
||||
export function startSpan(
|
||||
name: string,
|
||||
kind: Span['kind'],
|
||||
parentSpan?: SpanContext,
|
||||
attributes?: Record<string, unknown>
|
||||
): Span {
|
||||
const traceId = parentSpan?.traceId || crypto.randomUUID();
|
||||
|
||||
return {
|
||||
spanId: crypto.randomUUID(),
|
||||
traceId,
|
||||
parentSpanId: parentSpan?.spanId,
|
||||
name,
|
||||
kind,
|
||||
startTime: Date.now(),
|
||||
attributes: attributes || {},
|
||||
events: [],
|
||||
status: 'unset',
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* End a span with final status
|
||||
*/
|
||||
export function endSpan(span: Span, status?: 'ok' | 'error', error?: Error): Span {
|
||||
span.endTime = Date.now();
|
||||
span.duration = span.endTime - span.startTime;
|
||||
span.status = status || 'ok';
|
||||
|
||||
if (error) {
|
||||
span.error = {
|
||||
type: error.name,
|
||||
message: error.message,
|
||||
stack: error.stack,
|
||||
};
|
||||
}
|
||||
|
||||
return span;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add event to span
|
||||
*/
|
||||
export function addSpanEvent(
|
||||
span: Span,
|
||||
name: string,
|
||||
attributes?: Record<string, unknown>
|
||||
): void {
|
||||
span.events.push({
|
||||
timestamp: Date.now(),
|
||||
name,
|
||||
attributes,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Set span attributes
|
||||
*/
|
||||
export function setSpanAttributes(
|
||||
span: Span,
|
||||
attributes: Record<string, unknown>
|
||||
): void {
|
||||
span.attributes = { ...span.attributes, ...attributes };
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract span context for propagation
|
||||
*/
|
||||
export function getSpanContext(span: Span): SpanContext {
|
||||
return {
|
||||
traceId: span.traceId,
|
||||
spanId: span.spanId,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract span context from HTTP headers (W3C Trace Context)
|
||||
*/
|
||||
export function extractSpanContextFromHeaders(headers: Headers): SpanContext | undefined {
|
||||
const traceparent = headers.get('traceparent');
|
||||
if (!traceparent) return undefined;
|
||||
|
||||
// Parse W3C traceparent: version-traceId-spanId-flags
|
||||
const parts = traceparent.split('-');
|
||||
if (parts.length !== 4) return undefined;
|
||||
|
||||
return {
|
||||
traceId: parts[1],
|
||||
spanId: parts[2],
|
||||
traceFlags: parseInt(parts[3], 16),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Inject span context into headers
|
||||
*/
|
||||
export function injectSpanContextIntoHeaders(spanContext: SpanContext): Record<string, string> {
|
||||
return {
|
||||
'traceparent': `00-${spanContext.traceId}-${spanContext.spanId}-01`,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Log completed span
|
||||
*/
|
||||
export function logSpan(span: Span): void {
|
||||
const sanitizedAttributes = sanitizeContext(span.attributes);
|
||||
const sanitizedEvents = span.events.map(e => ({
|
||||
...e,
|
||||
attributes: e.attributes ? sanitizeContext(e.attributes) : undefined,
|
||||
}));
|
||||
|
||||
edgeLogger.info('Span completed', {
|
||||
span: {
|
||||
...span,
|
||||
attributes: sanitizedAttributes,
|
||||
events: sanitizedEvents,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Fields that should never be logged
|
||||
const SENSITIVE_FIELDS = [
|
||||
'password',
|
||||
@@ -52,7 +212,7 @@ const SENSITIVE_FIELDS = [
|
||||
/**
|
||||
* Sanitize context to remove sensitive data
|
||||
*/
|
||||
function sanitizeContext(context: LogContext): LogContext {
|
||||
export function sanitizeContext(context: LogContext): LogContext {
|
||||
const sanitized: LogContext = {};
|
||||
|
||||
for (const [key, value] of Object.entries(context)) {
|
||||
|
||||
@@ -2,6 +2,17 @@ import { serve } from 'https://deno.land/std@0.168.0/http/server.ts';
|
||||
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2.57.4';
|
||||
import { corsHeaders } from './cors.ts';
|
||||
import { rateLimiters, withRateLimit } from '../_shared/rateLimiter.ts';
|
||||
import {
|
||||
edgeLogger,
|
||||
startSpan,
|
||||
endSpan,
|
||||
addSpanEvent,
|
||||
setSpanAttributes,
|
||||
getSpanContext,
|
||||
logSpan,
|
||||
extractSpanContextFromHeaders,
|
||||
type Span
|
||||
} from '../_shared/logger.ts';
|
||||
|
||||
const SUPABASE_URL = Deno.env.get('SUPABASE_URL') || 'https://api.thrillwiki.com';
|
||||
const SUPABASE_ANON_KEY = Deno.env.get('SUPABASE_ANON_KEY')!;
|
||||
@@ -22,13 +33,29 @@ const handler = async (req: Request) => {
|
||||
});
|
||||
}
|
||||
|
||||
// Generate request ID for tracking
|
||||
const requestId = crypto.randomUUID();
|
||||
// Extract parent span context from headers (if present)
|
||||
const parentSpanContext = extractSpanContextFromHeaders(req.headers);
|
||||
|
||||
// Create root span for this edge function invocation
|
||||
const rootSpan = startSpan(
|
||||
'process-selective-approval',
|
||||
'SERVER',
|
||||
parentSpanContext,
|
||||
{
|
||||
'http.method': 'POST',
|
||||
'function.name': 'process-selective-approval',
|
||||
}
|
||||
);
|
||||
const requestId = rootSpan.spanId;
|
||||
|
||||
try {
|
||||
// STEP 1: Authentication
|
||||
addSpanEvent(rootSpan, 'authentication_start');
|
||||
const authHeader = req.headers.get('Authorization');
|
||||
if (!authHeader) {
|
||||
addSpanEvent(rootSpan, 'authentication_failed', { reason: 'missing_header' });
|
||||
endSpan(rootSpan, 'error');
|
||||
logSpan(rootSpan);
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Missing Authorization header' }),
|
||||
{
|
||||
@@ -47,6 +74,14 @@ const handler = async (req: Request) => {
|
||||
|
||||
const { data: { user }, error: authError } = await supabase.auth.getUser();
|
||||
if (authError || !user) {
|
||||
addSpanEvent(rootSpan, 'authentication_failed', { error: authError?.message });
|
||||
edgeLogger.warn('Authentication failed', {
|
||||
requestId,
|
||||
error: authError?.message,
|
||||
action: 'process_approval'
|
||||
});
|
||||
endSpan(rootSpan, 'error', authError || new Error('Unauthorized'));
|
||||
logSpan(rootSpan);
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Unauthorized' }),
|
||||
{
|
||||
@@ -59,13 +94,34 @@ const handler = async (req: Request) => {
|
||||
);
|
||||
}
|
||||
|
||||
console.log(`[${requestId}] Approval request from moderator ${user.id}`);
|
||||
setSpanAttributes(rootSpan, { 'user.id': user.id });
|
||||
addSpanEvent(rootSpan, 'authentication_success');
|
||||
edgeLogger.info('Approval request received', {
|
||||
requestId,
|
||||
moderatorId: user.id,
|
||||
action: 'process_approval'
|
||||
});
|
||||
|
||||
// STEP 2: Parse request
|
||||
addSpanEvent(rootSpan, 'validation_start');
|
||||
const body: ApprovalRequest = await req.json();
|
||||
const { submissionId, itemIds, idempotencyKey } = body;
|
||||
|
||||
if (!submissionId || !itemIds || itemIds.length === 0) {
|
||||
addSpanEvent(rootSpan, 'validation_failed', {
|
||||
hasSubmissionId: !!submissionId,
|
||||
hasItemIds: !!itemIds,
|
||||
itemCount: itemIds?.length || 0,
|
||||
});
|
||||
edgeLogger.warn('Invalid request payload', {
|
||||
requestId,
|
||||
hasSubmissionId: !!submissionId,
|
||||
hasItemIds: !!itemIds,
|
||||
itemCount: itemIds?.length || 0,
|
||||
action: 'process_approval'
|
||||
});
|
||||
endSpan(rootSpan, 'error');
|
||||
logSpan(rootSpan);
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Missing required fields: submissionId, itemIds' }),
|
||||
{
|
||||
@@ -78,7 +134,21 @@ const handler = async (req: Request) => {
|
||||
);
|
||||
}
|
||||
|
||||
setSpanAttributes(rootSpan, {
|
||||
'submission.id': submissionId,
|
||||
'submission.item_count': itemIds.length,
|
||||
'idempotency.key': idempotencyKey,
|
||||
});
|
||||
addSpanEvent(rootSpan, 'validation_complete');
|
||||
edgeLogger.info('Request validated', {
|
||||
requestId,
|
||||
submissionId,
|
||||
itemCount: itemIds.length,
|
||||
action: 'process_approval'
|
||||
});
|
||||
|
||||
// STEP 3: Idempotency check
|
||||
addSpanEvent(rootSpan, 'idempotency_check_start');
|
||||
const { data: existingKey } = await supabase
|
||||
.from('submission_idempotency_keys')
|
||||
.select('*')
|
||||
@@ -86,7 +156,16 @@ const handler = async (req: Request) => {
|
||||
.single();
|
||||
|
||||
if (existingKey?.status === 'completed') {
|
||||
console.log(`[${requestId}] Idempotency key already processed, returning cached result`);
|
||||
addSpanEvent(rootSpan, 'idempotency_cache_hit');
|
||||
setSpanAttributes(rootSpan, { 'cache.hit': true });
|
||||
edgeLogger.info('Idempotency cache hit', {
|
||||
requestId,
|
||||
idempotencyKey,
|
||||
cached: true,
|
||||
action: 'process_approval'
|
||||
});
|
||||
endSpan(rootSpan, 'ok');
|
||||
logSpan(rootSpan);
|
||||
return new Response(
|
||||
JSON.stringify(existingKey.result_data),
|
||||
{
|
||||
@@ -108,7 +187,15 @@ const handler = async (req: Request) => {
|
||||
.single();
|
||||
|
||||
if (submissionError || !submission) {
|
||||
console.error(`[${requestId}] Submission not found:`, submissionError);
|
||||
addSpanEvent(rootSpan, 'submission_fetch_failed', { error: submissionError?.message });
|
||||
edgeLogger.error('Submission not found', {
|
||||
requestId,
|
||||
submissionId,
|
||||
error: submissionError?.message,
|
||||
action: 'process_approval'
|
||||
});
|
||||
endSpan(rootSpan, 'error', submissionError || new Error('Submission not found'));
|
||||
logSpan(rootSpan);
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Submission not found' }),
|
||||
{
|
||||
@@ -123,7 +210,13 @@ const handler = async (req: Request) => {
|
||||
|
||||
// STEP 5: Verify moderator can approve this submission
|
||||
if (submission.assigned_to && submission.assigned_to !== user.id) {
|
||||
console.error(`[${requestId}] Submission locked by another moderator`);
|
||||
edgeLogger.warn('Lock conflict', {
|
||||
requestId,
|
||||
submissionId,
|
||||
lockedBy: submission.assigned_to,
|
||||
attemptedBy: user.id,
|
||||
action: 'process_approval'
|
||||
});
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Submission is locked by another moderator' }),
|
||||
{
|
||||
@@ -137,7 +230,13 @@ const handler = async (req: Request) => {
|
||||
}
|
||||
|
||||
if (!['pending', 'partially_approved'].includes(submission.status)) {
|
||||
console.error(`[${requestId}] Invalid submission status: ${submission.status}`);
|
||||
edgeLogger.warn('Invalid submission status', {
|
||||
requestId,
|
||||
submissionId,
|
||||
currentStatus: submission.status,
|
||||
expectedStatuses: ['pending', 'partially_approved'],
|
||||
action: 'process_approval'
|
||||
});
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Submission already processed' }),
|
||||
{
|
||||
@@ -150,17 +249,58 @@ const handler = async (req: Request) => {
|
||||
);
|
||||
}
|
||||
|
||||
// STEP 6: Register idempotency key as processing
|
||||
// STEP 6: Register idempotency key as processing (atomic upsert)
|
||||
// ✅ CRITICAL FIX: Use ON CONFLICT to prevent race conditions
|
||||
if (!existingKey) {
|
||||
await supabase.from('submission_idempotency_keys').insert({
|
||||
idempotency_key: idempotencyKey,
|
||||
submission_id: submissionId,
|
||||
moderator_id: user.id,
|
||||
status: 'processing'
|
||||
});
|
||||
const { data: insertedKey, error: idempotencyError } = await supabase
|
||||
.from('submission_idempotency_keys')
|
||||
.insert({
|
||||
idempotency_key: idempotencyKey,
|
||||
submission_id: submissionId,
|
||||
moderator_id: user.id,
|
||||
status: 'processing'
|
||||
})
|
||||
.select()
|
||||
.single();
|
||||
|
||||
// If conflict occurred, another moderator is processing
|
||||
if (idempotencyError && idempotencyError.code === '23505') {
|
||||
edgeLogger.warn('Idempotency key conflict - another request processing', {
|
||||
requestId,
|
||||
idempotencyKey,
|
||||
moderatorId: user.id
|
||||
});
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Another moderator is processing this submission' }),
|
||||
{ status: 409, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
|
||||
);
|
||||
}
|
||||
|
||||
if (idempotencyError) {
|
||||
throw idempotencyError;
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`[${requestId}] Calling process_approval_transaction RPC`);
|
||||
// Create child span for RPC transaction
|
||||
const rpcSpan = startSpan(
|
||||
'process_approval_transaction',
|
||||
'DATABASE',
|
||||
getSpanContext(rootSpan),
|
||||
{
|
||||
'db.operation': 'rpc',
|
||||
'db.function': 'process_approval_transaction',
|
||||
'submission.id': submissionId,
|
||||
'submission.item_count': itemIds.length,
|
||||
}
|
||||
);
|
||||
|
||||
addSpanEvent(rpcSpan, 'rpc_call_start');
|
||||
edgeLogger.info('Calling approval transaction RPC', {
|
||||
requestId,
|
||||
submissionId,
|
||||
itemCount: itemIds.length,
|
||||
action: 'process_approval'
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// STEP 7: Call RPC function with deadlock retry logic
|
||||
@@ -178,7 +318,9 @@ const handler = async (req: Request) => {
|
||||
p_item_ids: itemIds,
|
||||
p_moderator_id: user.id,
|
||||
p_submitter_id: submission.user_id,
|
||||
p_request_id: requestId
|
||||
p_request_id: requestId,
|
||||
p_trace_id: rootSpan.traceId,
|
||||
p_parent_span_id: rpcSpan.spanId
|
||||
}
|
||||
);
|
||||
|
||||
@@ -187,6 +329,10 @@ const handler = async (req: Request) => {
|
||||
|
||||
if (!rpcError) {
|
||||
// Success!
|
||||
addSpanEvent(rpcSpan, 'rpc_call_success', {
|
||||
'result.status': data?.status,
|
||||
'items.processed': itemIds.length,
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -194,23 +340,51 @@ const handler = async (req: Request) => {
|
||||
if (rpcError.code === '40P01' || rpcError.code === '40001') {
|
||||
retryCount++;
|
||||
if (retryCount > MAX_DEADLOCK_RETRIES) {
|
||||
console.error(`[${requestId}] Max deadlock retries exceeded`);
|
||||
addSpanEvent(rpcSpan, 'max_retries_exceeded', { attempt: retryCount });
|
||||
edgeLogger.error('Max deadlock retries exceeded', {
|
||||
requestId,
|
||||
submissionId,
|
||||
attempt: retryCount,
|
||||
action: 'process_approval'
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
const backoffMs = 100 * Math.pow(2, retryCount);
|
||||
console.log(`[${requestId}] Deadlock detected, retrying in ${backoffMs}ms (attempt ${retryCount}/${MAX_DEADLOCK_RETRIES})`);
|
||||
addSpanEvent(rpcSpan, 'deadlock_retry', { attempt: retryCount, backoffMs });
|
||||
edgeLogger.warn('Deadlock detected, retrying', {
|
||||
requestId,
|
||||
attempt: retryCount,
|
||||
maxAttempts: MAX_DEADLOCK_RETRIES,
|
||||
backoffMs,
|
||||
action: 'process_approval'
|
||||
});
|
||||
await new Promise(r => setTimeout(r, backoffMs));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Non-retryable error, break immediately
|
||||
addSpanEvent(rpcSpan, 'rpc_call_failed', {
|
||||
error: rpcError.message,
|
||||
errorCode: rpcError.code
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
if (rpcError) {
|
||||
// Transaction failed - EVERYTHING rolled back automatically by PostgreSQL
|
||||
console.error(`[${requestId}] Approval transaction failed:`, rpcError);
|
||||
endSpan(rpcSpan, 'error', rpcError);
|
||||
logSpan(rpcSpan);
|
||||
|
||||
edgeLogger.error('Transaction failed', {
|
||||
requestId,
|
||||
duration: rpcSpan.duration,
|
||||
submissionId,
|
||||
error: rpcError.message,
|
||||
errorCode: rpcError.code,
|
||||
retries: retryCount,
|
||||
action: 'process_approval'
|
||||
});
|
||||
|
||||
// Update idempotency key to failed
|
||||
try {
|
||||
@@ -223,10 +397,19 @@ const handler = async (req: Request) => {
|
||||
})
|
||||
.eq('idempotency_key', idempotencyKey);
|
||||
} catch (updateError) {
|
||||
console.error(`[${requestId}] Failed to update idempotency key to failed:`, updateError);
|
||||
edgeLogger.warn('Failed to update idempotency key', {
|
||||
requestId,
|
||||
idempotencyKey,
|
||||
status: 'failed',
|
||||
error: updateError instanceof Error ? updateError.message : String(updateError),
|
||||
action: 'process_approval'
|
||||
});
|
||||
// Non-blocking - continue with error response even if idempotency update fails
|
||||
}
|
||||
|
||||
endSpan(rootSpan, 'error', rpcError);
|
||||
logSpan(rootSpan);
|
||||
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
error: 'Approval transaction failed',
|
||||
@@ -244,7 +427,24 @@ const handler = async (req: Request) => {
|
||||
);
|
||||
}
|
||||
|
||||
console.log(`[${requestId}] Transaction completed successfully:`, result);
|
||||
// RPC succeeded
|
||||
endSpan(rpcSpan, 'ok');
|
||||
logSpan(rpcSpan);
|
||||
|
||||
setSpanAttributes(rootSpan, {
|
||||
'result.status': result?.status,
|
||||
'result.final_status': result?.status,
|
||||
'retries': retryCount,
|
||||
});
|
||||
edgeLogger.info('Transaction completed successfully', {
|
||||
requestId,
|
||||
duration: rpcSpan.duration,
|
||||
submissionId,
|
||||
itemCount: itemIds.length,
|
||||
retries: retryCount,
|
||||
newStatus: result?.status,
|
||||
action: 'process_approval'
|
||||
});
|
||||
|
||||
// STEP 8: Success - update idempotency key
|
||||
try {
|
||||
@@ -257,10 +457,19 @@ const handler = async (req: Request) => {
|
||||
})
|
||||
.eq('idempotency_key', idempotencyKey);
|
||||
} catch (updateError) {
|
||||
console.error(`[${requestId}] Failed to update idempotency key to completed:`, updateError);
|
||||
edgeLogger.warn('Failed to update idempotency key', {
|
||||
requestId,
|
||||
idempotencyKey,
|
||||
status: 'completed',
|
||||
error: updateError instanceof Error ? updateError.message : String(updateError),
|
||||
action: 'process_approval'
|
||||
});
|
||||
// Non-blocking - transaction succeeded, so continue with success response
|
||||
}
|
||||
|
||||
endSpan(rootSpan, 'ok');
|
||||
logSpan(rootSpan);
|
||||
|
||||
return new Response(
|
||||
JSON.stringify(result),
|
||||
{
|
||||
@@ -274,7 +483,16 @@ const handler = async (req: Request) => {
|
||||
);
|
||||
|
||||
} catch (error) {
|
||||
console.error(`[${requestId}] Unexpected error:`, error);
|
||||
endSpan(rootSpan, 'error', error instanceof Error ? error : new Error(String(error)));
|
||||
logSpan(rootSpan);
|
||||
|
||||
edgeLogger.error('Unexpected error', {
|
||||
requestId,
|
||||
duration: rootSpan.duration,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
action: 'process_approval'
|
||||
});
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
error: 'Internal server error',
|
||||
|
||||
4
supabase/functions/process-selective-rejection/cors.ts
Normal file
4
supabase/functions/process-selective-rejection/cors.ts
Normal file
@@ -0,0 +1,4 @@
|
||||
export const corsHeaders = {
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type',
|
||||
};
|
||||
516
supabase/functions/process-selective-rejection/index.ts
Normal file
516
supabase/functions/process-selective-rejection/index.ts
Normal file
@@ -0,0 +1,516 @@
|
||||
import { serve } from 'https://deno.land/std@0.168.0/http/server.ts';
|
||||
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2.57.4';
|
||||
import { corsHeaders } from './cors.ts';
|
||||
import { rateLimiters, withRateLimit } from '../_shared/rateLimiter.ts';
|
||||
import {
|
||||
edgeLogger,
|
||||
startSpan,
|
||||
endSpan,
|
||||
addSpanEvent,
|
||||
setSpanAttributes,
|
||||
getSpanContext,
|
||||
logSpan,
|
||||
extractSpanContextFromHeaders,
|
||||
type Span
|
||||
} from '../_shared/logger.ts';
|
||||
|
||||
const SUPABASE_URL = Deno.env.get('SUPABASE_URL') || 'https://api.thrillwiki.com';
|
||||
const SUPABASE_ANON_KEY = Deno.env.get('SUPABASE_ANON_KEY')!;
|
||||
|
||||
interface RejectionRequest {
|
||||
submissionId: string;
|
||||
itemIds: string[];
|
||||
rejectionReason: string;
|
||||
idempotencyKey: string;
|
||||
}
|
||||
|
||||
// Main handler function
|
||||
const handler = async (req: Request) => {
|
||||
// Handle CORS preflight requests
|
||||
if (req.method === 'OPTIONS') {
|
||||
return new Response(null, {
|
||||
status: 204,
|
||||
headers: corsHeaders
|
||||
});
|
||||
}
|
||||
|
||||
// Extract parent span context from headers (if present)
|
||||
const parentSpanContext = extractSpanContextFromHeaders(req.headers);
|
||||
|
||||
// Create root span for this edge function invocation
|
||||
const rootSpan = startSpan(
|
||||
'process-selective-rejection',
|
||||
'SERVER',
|
||||
parentSpanContext,
|
||||
{
|
||||
'http.method': 'POST',
|
||||
'function.name': 'process-selective-rejection',
|
||||
}
|
||||
);
|
||||
const requestId = rootSpan.spanId;
|
||||
|
||||
try {
|
||||
// STEP 1: Authentication
|
||||
addSpanEvent(rootSpan, 'authentication_start');
|
||||
const authHeader = req.headers.get('Authorization');
|
||||
if (!authHeader) {
|
||||
addSpanEvent(rootSpan, 'authentication_failed', { reason: 'missing_header' });
|
||||
endSpan(rootSpan, 'error');
|
||||
logSpan(rootSpan);
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Missing Authorization header' }),
|
||||
{
|
||||
status: 401,
|
||||
headers: {
|
||||
...corsHeaders,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
const supabase = createClient(SUPABASE_URL, SUPABASE_ANON_KEY, {
|
||||
global: { headers: { Authorization: authHeader } }
|
||||
});
|
||||
|
||||
const { data: { user }, error: authError } = await supabase.auth.getUser();
|
||||
if (authError || !user) {
|
||||
addSpanEvent(rootSpan, 'authentication_failed', { error: authError?.message });
|
||||
edgeLogger.warn('Authentication failed', {
|
||||
requestId,
|
||||
error: authError?.message,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
endSpan(rootSpan, 'error', authError || new Error('Unauthorized'));
|
||||
logSpan(rootSpan);
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Unauthorized' }),
|
||||
{
|
||||
status: 401,
|
||||
headers: {
|
||||
...corsHeaders,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
setSpanAttributes(rootSpan, { 'user.id': user.id });
|
||||
addSpanEvent(rootSpan, 'authentication_success');
|
||||
edgeLogger.info('Rejection request received', {
|
||||
requestId,
|
||||
moderatorId: user.id,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
|
||||
// STEP 2: Parse request
|
||||
addSpanEvent(rootSpan, 'validation_start');
|
||||
const body: RejectionRequest = await req.json();
|
||||
const { submissionId, itemIds, rejectionReason, idempotencyKey } = body;
|
||||
|
||||
if (!submissionId || !itemIds || itemIds.length === 0 || !rejectionReason) {
|
||||
addSpanEvent(rootSpan, 'validation_failed', {
|
||||
hasSubmissionId: !!submissionId,
|
||||
hasItemIds: !!itemIds,
|
||||
itemCount: itemIds?.length || 0,
|
||||
hasReason: !!rejectionReason,
|
||||
});
|
||||
edgeLogger.warn('Invalid request payload', {
|
||||
requestId,
|
||||
hasSubmissionId: !!submissionId,
|
||||
hasItemIds: !!itemIds,
|
||||
itemCount: itemIds?.length || 0,
|
||||
hasReason: !!rejectionReason,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
endSpan(rootSpan, 'error');
|
||||
logSpan(rootSpan);
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Missing required fields: submissionId, itemIds, rejectionReason' }),
|
||||
{
|
||||
status: 400,
|
||||
headers: {
|
||||
...corsHeaders,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
setSpanAttributes(rootSpan, {
|
||||
'submission.id': submissionId,
|
||||
'submission.item_count': itemIds.length,
|
||||
'idempotency.key': idempotencyKey,
|
||||
});
|
||||
addSpanEvent(rootSpan, 'validation_complete');
|
||||
edgeLogger.info('Request validated', {
|
||||
requestId,
|
||||
submissionId,
|
||||
itemCount: itemIds.length,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
|
||||
// STEP 3: Idempotency check
|
||||
addSpanEvent(rootSpan, 'idempotency_check_start');
|
||||
const { data: existingKey } = await supabase
|
||||
.from('submission_idempotency_keys')
|
||||
.select('*')
|
||||
.eq('idempotency_key', idempotencyKey)
|
||||
.single();
|
||||
|
||||
if (existingKey?.status === 'completed') {
|
||||
addSpanEvent(rootSpan, 'idempotency_cache_hit');
|
||||
setSpanAttributes(rootSpan, { 'cache.hit': true });
|
||||
edgeLogger.info('Idempotency cache hit', {
|
||||
requestId,
|
||||
idempotencyKey,
|
||||
cached: true,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
endSpan(rootSpan, 'ok');
|
||||
logSpan(rootSpan);
|
||||
return new Response(
|
||||
JSON.stringify(existingKey.result_data),
|
||||
{
|
||||
status: 200,
|
||||
headers: {
|
||||
...corsHeaders,
|
||||
'Content-Type': 'application/json',
|
||||
'X-Cache-Status': 'HIT'
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// STEP 4: Fetch submission to get submitter_id
|
||||
const { data: submission, error: submissionError } = await supabase
|
||||
.from('content_submissions')
|
||||
.select('user_id, status, assigned_to')
|
||||
.eq('id', submissionId)
|
||||
.single();
|
||||
|
||||
if (submissionError || !submission) {
|
||||
addSpanEvent(rootSpan, 'submission_fetch_failed', { error: submissionError?.message });
|
||||
edgeLogger.error('Submission not found', {
|
||||
requestId,
|
||||
submissionId,
|
||||
error: submissionError?.message,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
endSpan(rootSpan, 'error', submissionError || new Error('Submission not found'));
|
||||
logSpan(rootSpan);
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Submission not found' }),
|
||||
{
|
||||
status: 404,
|
||||
headers: {
|
||||
...corsHeaders,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// STEP 5: Verify moderator can reject this submission
|
||||
if (submission.assigned_to && submission.assigned_to !== user.id) {
|
||||
edgeLogger.warn('Lock conflict', {
|
||||
requestId,
|
||||
submissionId,
|
||||
lockedBy: submission.assigned_to,
|
||||
attemptedBy: user.id,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Submission is locked by another moderator' }),
|
||||
{
|
||||
status: 409,
|
||||
headers: {
|
||||
...corsHeaders,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
if (!['pending', 'partially_approved'].includes(submission.status)) {
|
||||
edgeLogger.warn('Invalid submission status', {
|
||||
requestId,
|
||||
submissionId,
|
||||
currentStatus: submission.status,
|
||||
expectedStatuses: ['pending', 'partially_approved'],
|
||||
action: 'process_rejection'
|
||||
});
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Submission already processed' }),
|
||||
{
|
||||
status: 400,
|
||||
headers: {
|
||||
...corsHeaders,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// STEP 6: Register idempotency key as processing (atomic upsert)
|
||||
// ✅ CRITICAL FIX: Use ON CONFLICT to prevent race conditions
|
||||
if (!existingKey) {
|
||||
const { data: insertedKey, error: idempotencyError } = await supabase
|
||||
.from('submission_idempotency_keys')
|
||||
.insert({
|
||||
idempotency_key: idempotencyKey,
|
||||
submission_id: submissionId,
|
||||
moderator_id: user.id,
|
||||
status: 'processing'
|
||||
})
|
||||
.select()
|
||||
.single();
|
||||
|
||||
// If conflict occurred, another moderator is processing
|
||||
if (idempotencyError && idempotencyError.code === '23505') {
|
||||
edgeLogger.warn('Idempotency key conflict - another request processing', {
|
||||
requestId,
|
||||
idempotencyKey,
|
||||
moderatorId: user.id
|
||||
});
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Another moderator is processing this submission' }),
|
||||
{ status: 409, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
|
||||
);
|
||||
}
|
||||
|
||||
if (idempotencyError) {
|
||||
throw idempotencyError;
|
||||
}
|
||||
}
|
||||
|
||||
// Create child span for RPC transaction
|
||||
const rpcSpan = startSpan(
|
||||
'process_rejection_transaction',
|
||||
'DATABASE',
|
||||
getSpanContext(rootSpan),
|
||||
{
|
||||
'db.operation': 'rpc',
|
||||
'db.function': 'process_rejection_transaction',
|
||||
'submission.id': submissionId,
|
||||
'submission.item_count': itemIds.length,
|
||||
}
|
||||
);
|
||||
|
||||
addSpanEvent(rpcSpan, 'rpc_call_start');
|
||||
edgeLogger.info('Calling rejection transaction RPC', {
|
||||
requestId,
|
||||
submissionId,
|
||||
itemCount: itemIds.length,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// STEP 7: Call RPC function with deadlock retry logic
|
||||
// ============================================================================
|
||||
let retryCount = 0;
|
||||
const MAX_DEADLOCK_RETRIES = 3;
|
||||
let result: any = null;
|
||||
let rpcError: any = null;
|
||||
|
||||
while (retryCount <= MAX_DEADLOCK_RETRIES) {
|
||||
const { data, error } = await supabase.rpc(
|
||||
'process_rejection_transaction',
|
||||
{
|
||||
p_submission_id: submissionId,
|
||||
p_item_ids: itemIds,
|
||||
p_moderator_id: user.id,
|
||||
p_rejection_reason: rejectionReason,
|
||||
p_request_id: requestId,
|
||||
p_trace_id: rootSpan.traceId,
|
||||
p_parent_span_id: rpcSpan.spanId
|
||||
}
|
||||
);
|
||||
|
||||
result = data;
|
||||
rpcError = error;
|
||||
|
||||
if (!rpcError) {
|
||||
// Success!
|
||||
addSpanEvent(rpcSpan, 'rpc_call_success', {
|
||||
'result.status': data?.status,
|
||||
'items.processed': itemIds.length,
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
// Check for deadlock (40P01) or serialization failure (40001)
|
||||
if (rpcError.code === '40P01' || rpcError.code === '40001') {
|
||||
retryCount++;
|
||||
if (retryCount > MAX_DEADLOCK_RETRIES) {
|
||||
addSpanEvent(rpcSpan, 'max_retries_exceeded', { attempt: retryCount });
|
||||
edgeLogger.error('Max deadlock retries exceeded', {
|
||||
requestId,
|
||||
submissionId,
|
||||
attempt: retryCount,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
const backoffMs = 100 * Math.pow(2, retryCount);
|
||||
addSpanEvent(rpcSpan, 'deadlock_retry', { attempt: retryCount, backoffMs });
|
||||
edgeLogger.warn('Deadlock detected, retrying', {
|
||||
requestId,
|
||||
attempt: retryCount,
|
||||
maxAttempts: MAX_DEADLOCK_RETRIES,
|
||||
backoffMs,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
await new Promise(r => setTimeout(r, backoffMs));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Non-retryable error, break immediately
|
||||
addSpanEvent(rpcSpan, 'rpc_call_failed', {
|
||||
error: rpcError.message,
|
||||
errorCode: rpcError.code
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
if (rpcError) {
|
||||
// Transaction failed - EVERYTHING rolled back automatically by PostgreSQL
|
||||
endSpan(rpcSpan, 'error', rpcError);
|
||||
logSpan(rpcSpan);
|
||||
|
||||
edgeLogger.error('Transaction failed', {
|
||||
requestId,
|
||||
duration: rpcSpan.duration,
|
||||
submissionId,
|
||||
error: rpcError.message,
|
||||
errorCode: rpcError.code,
|
||||
retries: retryCount,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
|
||||
// Update idempotency key to failed
|
||||
try {
|
||||
await supabase
|
||||
.from('submission_idempotency_keys')
|
||||
.update({
|
||||
status: 'failed',
|
||||
error_message: rpcError.message,
|
||||
completed_at: new Date().toISOString()
|
||||
})
|
||||
.eq('idempotency_key', idempotencyKey);
|
||||
} catch (updateError) {
|
||||
edgeLogger.warn('Failed to update idempotency key', {
|
||||
requestId,
|
||||
idempotencyKey,
|
||||
status: 'failed',
|
||||
error: updateError instanceof Error ? updateError.message : String(updateError),
|
||||
action: 'process_rejection'
|
||||
});
|
||||
// Non-blocking - continue with error response even if idempotency update fails
|
||||
}
|
||||
|
||||
endSpan(rootSpan, 'error', rpcError);
|
||||
logSpan(rootSpan);
|
||||
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
error: 'Rejection transaction failed',
|
||||
message: rpcError.message,
|
||||
details: rpcError.details,
|
||||
retries: retryCount
|
||||
}),
|
||||
{
|
||||
status: 500,
|
||||
headers: {
|
||||
...corsHeaders,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// RPC succeeded
|
||||
endSpan(rpcSpan, 'ok');
|
||||
logSpan(rpcSpan);
|
||||
|
||||
setSpanAttributes(rootSpan, {
|
||||
'result.status': result?.status,
|
||||
'result.final_status': result?.status,
|
||||
'retries': retryCount,
|
||||
});
|
||||
edgeLogger.info('Transaction completed successfully', {
|
||||
requestId,
|
||||
duration: rpcSpan.duration,
|
||||
submissionId,
|
||||
itemCount: itemIds.length,
|
||||
retries: retryCount,
|
||||
newStatus: result?.status,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
|
||||
// STEP 8: Success - update idempotency key
|
||||
try {
|
||||
await supabase
|
||||
.from('submission_idempotency_keys')
|
||||
.update({
|
||||
status: 'completed',
|
||||
result_data: result,
|
||||
completed_at: new Date().toISOString()
|
||||
})
|
||||
.eq('idempotency_key', idempotencyKey);
|
||||
} catch (updateError) {
|
||||
edgeLogger.warn('Failed to update idempotency key', {
|
||||
requestId,
|
||||
idempotencyKey,
|
||||
status: 'completed',
|
||||
error: updateError instanceof Error ? updateError.message : String(updateError),
|
||||
action: 'process_rejection'
|
||||
});
|
||||
// Non-blocking - transaction succeeded, so continue with success response
|
||||
}
|
||||
|
||||
endSpan(rootSpan, 'ok');
|
||||
logSpan(rootSpan);
|
||||
|
||||
return new Response(
|
||||
JSON.stringify(result),
|
||||
{
|
||||
status: 200,
|
||||
headers: {
|
||||
...corsHeaders,
|
||||
'Content-Type': 'application/json',
|
||||
'X-Request-Id': requestId
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
} catch (error) {
|
||||
endSpan(rootSpan, 'error', error instanceof Error ? error : new Error(String(error)));
|
||||
logSpan(rootSpan);
|
||||
|
||||
edgeLogger.error('Unexpected error', {
|
||||
requestId,
|
||||
duration: rootSpan.duration,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
action: 'process_rejection'
|
||||
});
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
error: 'Internal server error',
|
||||
message: error instanceof Error ? error.message : 'Unknown error'
|
||||
}),
|
||||
{
|
||||
status: 500,
|
||||
headers: {
|
||||
...corsHeaders,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
// Apply rate limiting: 10 requests per minute per IP (standard tier)
|
||||
serve(withRateLimit(handler, rateLimiters.standard, corsHeaders));
|
||||
@@ -0,0 +1,159 @@
|
||||
-- ============================================================================
|
||||
-- CRITICAL: Add Atomic Rejection Transaction RPC
|
||||
-- ============================================================================
|
||||
-- This migration creates process_rejection_transaction to ensure atomic
|
||||
-- rejection of submission items with proper audit logging and status updates.
|
||||
--
|
||||
-- Features:
|
||||
-- - Atomic updates to submission_items.status = 'rejected'
|
||||
-- - Sets rejection_reason for each item
|
||||
-- - Updates parent submission status (rejected or partially_approved)
|
||||
-- - Logs to moderation_audit_log
|
||||
-- - Releases lock (assigned_to = NULL, locked_until = NULL)
|
||||
-- - Returns transaction result
|
||||
-- ============================================================================
|
||||
|
||||
CREATE OR REPLACE FUNCTION process_rejection_transaction(
|
||||
p_submission_id UUID,
|
||||
p_item_ids UUID[],
|
||||
p_moderator_id UUID,
|
||||
p_rejection_reason TEXT,
|
||||
p_request_id TEXT DEFAULT NULL
|
||||
)
|
||||
RETURNS JSONB
|
||||
LANGUAGE plpgsql
|
||||
SECURITY DEFINER
|
||||
SET search_path = public
|
||||
AS $$
|
||||
DECLARE
|
||||
v_start_time TIMESTAMPTZ;
|
||||
v_result JSONB;
|
||||
v_rejected_count INTEGER := 0;
|
||||
v_final_status TEXT;
|
||||
v_some_pending BOOLEAN := FALSE;
|
||||
BEGIN
|
||||
v_start_time := clock_timestamp();
|
||||
|
||||
RAISE NOTICE '[%] Starting atomic rejection transaction for submission %',
|
||||
COALESCE(p_request_id, 'NO_REQUEST_ID'),
|
||||
p_submission_id;
|
||||
|
||||
-- ========================================================================
|
||||
-- STEP 1: Set session variables (transaction-scoped)
|
||||
-- ========================================================================
|
||||
PERFORM set_config('app.moderator_id', p_moderator_id::text, true);
|
||||
|
||||
-- ========================================================================
|
||||
-- STEP 2: Validate submission ownership and lock status
|
||||
-- ========================================================================
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM content_submissions
|
||||
WHERE id = p_submission_id
|
||||
AND (assigned_to = p_moderator_id OR assigned_to IS NULL)
|
||||
AND status IN ('pending', 'partially_approved')
|
||||
) THEN
|
||||
RAISE EXCEPTION 'Submission not found, locked by another moderator, or already processed'
|
||||
USING ERRCODE = '42501';
|
||||
END IF;
|
||||
|
||||
-- ========================================================================
|
||||
-- STEP 3: Update all items to rejected atomically
|
||||
-- ========================================================================
|
||||
UPDATE submission_items
|
||||
SET
|
||||
status = 'rejected',
|
||||
rejection_reason = p_rejection_reason,
|
||||
updated_at = NOW()
|
||||
WHERE id = ANY(p_item_ids)
|
||||
AND submission_id = p_submission_id
|
||||
AND status IN ('pending', 'rejected');
|
||||
|
||||
GET DIAGNOSTICS v_rejected_count = ROW_COUNT;
|
||||
|
||||
RAISE NOTICE '[%] Rejected % items',
|
||||
COALESCE(p_request_id, 'NO_REQUEST_ID'),
|
||||
v_rejected_count;
|
||||
|
||||
-- ========================================================================
|
||||
-- STEP 4: Determine final submission status
|
||||
-- ========================================================================
|
||||
-- Check if any items are still pending
|
||||
SELECT EXISTS(
|
||||
SELECT 1 FROM submission_items
|
||||
WHERE submission_id = p_submission_id
|
||||
AND status = 'pending'
|
||||
) INTO v_some_pending;
|
||||
|
||||
-- Set final status
|
||||
v_final_status := CASE
|
||||
WHEN v_some_pending THEN 'partially_approved'
|
||||
WHEN EXISTS(
|
||||
SELECT 1 FROM submission_items
|
||||
WHERE submission_id = p_submission_id
|
||||
AND status = 'approved'
|
||||
) THEN 'partially_approved'
|
||||
ELSE 'rejected'
|
||||
END;
|
||||
|
||||
-- ========================================================================
|
||||
-- STEP 5: Update parent submission
|
||||
-- ========================================================================
|
||||
UPDATE content_submissions
|
||||
SET
|
||||
status = v_final_status,
|
||||
reviewer_id = p_moderator_id,
|
||||
reviewed_at = NOW(),
|
||||
assigned_to = NULL,
|
||||
locked_until = NULL,
|
||||
reviewer_notes = p_rejection_reason
|
||||
WHERE id = p_submission_id;
|
||||
|
||||
-- ========================================================================
|
||||
-- STEP 6: Log to moderation_audit_log
|
||||
-- ========================================================================
|
||||
INSERT INTO moderation_audit_log (
|
||||
submission_id,
|
||||
moderator_id,
|
||||
action,
|
||||
details,
|
||||
created_at
|
||||
) VALUES (
|
||||
p_submission_id,
|
||||
p_moderator_id,
|
||||
'rejection',
|
||||
jsonb_build_object(
|
||||
'item_ids', p_item_ids,
|
||||
'rejection_reason', p_rejection_reason,
|
||||
'rejected_count', v_rejected_count,
|
||||
'final_status', v_final_status,
|
||||
'request_id', p_request_id
|
||||
),
|
||||
NOW()
|
||||
);
|
||||
|
||||
-- ========================================================================
|
||||
-- STEP 7: Build result
|
||||
-- ========================================================================
|
||||
v_result := jsonb_build_object(
|
||||
'success', TRUE,
|
||||
'rejected_count', v_rejected_count,
|
||||
'submission_status', v_final_status,
|
||||
'duration_ms', EXTRACT(EPOCH FROM (clock_timestamp() - v_start_time)) * 1000
|
||||
);
|
||||
|
||||
-- Clear session variables
|
||||
PERFORM set_config('app.moderator_id', '', true);
|
||||
|
||||
RAISE NOTICE '[%] Rejection transaction completed in %ms',
|
||||
COALESCE(p_request_id, 'NO_REQUEST_ID'),
|
||||
EXTRACT(EPOCH FROM (clock_timestamp() - v_start_time)) * 1000;
|
||||
|
||||
RETURN v_result;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- Grant execute permissions
|
||||
GRANT EXECUTE ON FUNCTION process_rejection_transaction TO authenticated;
|
||||
|
||||
COMMENT ON FUNCTION process_rejection_transaction IS
|
||||
'Atomic rejection transaction with audit logging and lock release';
|
||||
@@ -0,0 +1,172 @@
|
||||
-- Fix create_submission_with_items to remove temp_location_data reference
|
||||
-- This column was dropped but the function still references it, causing park submissions to fail
|
||||
|
||||
DROP FUNCTION IF EXISTS public.create_submission_with_items(uuid, text, text, jsonb, uuid);
|
||||
|
||||
CREATE OR REPLACE FUNCTION public.create_submission_with_items(
|
||||
p_submission_id uuid,
|
||||
p_entity_type text,
|
||||
p_action_type text,
|
||||
p_items jsonb,
|
||||
p_user_id uuid
|
||||
)
|
||||
RETURNS uuid
|
||||
LANGUAGE plpgsql
|
||||
SECURITY DEFINER
|
||||
SET search_path TO 'public'
|
||||
AS $function$
|
||||
DECLARE
|
||||
v_item JSONB;
|
||||
v_item_type TEXT;
|
||||
v_item_data JSONB;
|
||||
v_depends_on INTEGER;
|
||||
v_order_index INTEGER;
|
||||
v_created_ids UUID[] := ARRAY[]::UUID[];
|
||||
v_submission_item_id UUID;
|
||||
v_entity_submission_id UUID;
|
||||
BEGIN
|
||||
-- Loop through items array
|
||||
FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
|
||||
LOOP
|
||||
v_item_type := v_item->>'item_type';
|
||||
v_item_data := v_item->'item_data';
|
||||
v_depends_on := (v_item->>'depends_on')::INTEGER;
|
||||
v_order_index := (v_item->>'order_index')::INTEGER;
|
||||
|
||||
-- Resolve dependency references
|
||||
IF v_depends_on IS NOT NULL THEN
|
||||
v_item_data := v_item_data || jsonb_build_object(
|
||||
v_item->>'dependency_field',
|
||||
v_created_ids[v_depends_on + 1]
|
||||
);
|
||||
END IF;
|
||||
|
||||
-- Create submission based on entity type
|
||||
IF v_item_type = 'park' THEN
|
||||
INSERT INTO park_submissions (
|
||||
submission_id, name, slug, description, park_type, status,
|
||||
opening_date, opening_date_precision, closing_date, closing_date_precision,
|
||||
location_id, operator_id, property_owner_id,
|
||||
website_url, phone, email,
|
||||
banner_image_url, banner_image_id, card_image_url, card_image_id
|
||||
) VALUES (
|
||||
p_submission_id,
|
||||
v_item_data->>'name',
|
||||
v_item_data->>'slug',
|
||||
v_item_data->>'description',
|
||||
v_item_data->>'park_type',
|
||||
v_item_data->>'status',
|
||||
(v_item_data->>'opening_date')::DATE,
|
||||
v_item_data->>'opening_date_precision',
|
||||
(v_item_data->>'closing_date')::DATE,
|
||||
v_item_data->>'closing_date_precision',
|
||||
(v_item_data->>'location_id')::UUID,
|
||||
(v_item_data->>'operator_id')::UUID,
|
||||
(v_item_data->>'property_owner_id')::UUID,
|
||||
v_item_data->>'website_url',
|
||||
v_item_data->>'phone',
|
||||
v_item_data->>'email',
|
||||
v_item_data->>'banner_image_url',
|
||||
v_item_data->>'banner_image_id',
|
||||
v_item_data->>'card_image_url',
|
||||
v_item_data->>'card_image_id'
|
||||
) RETURNING id INTO v_entity_submission_id;
|
||||
|
||||
ELSIF v_item_type = 'ride' THEN
|
||||
INSERT INTO ride_submissions (
|
||||
submission_id, name, slug, description, category, status,
|
||||
opening_date, opening_date_precision, closing_date, closing_date_precision,
|
||||
park_id, manufacturer_id, designer_id, ride_model_id,
|
||||
banner_image_url, banner_image_id, card_image_url, card_image_id
|
||||
) VALUES (
|
||||
p_submission_id,
|
||||
v_item_data->>'name',
|
||||
v_item_data->>'slug',
|
||||
v_item_data->>'description',
|
||||
v_item_data->>'category',
|
||||
v_item_data->>'status',
|
||||
(v_item_data->>'opening_date')::DATE,
|
||||
v_item_data->>'opening_date_precision',
|
||||
(v_item_data->>'closing_date')::DATE,
|
||||
v_item_data->>'closing_date_precision',
|
||||
(v_item_data->>'park_id')::UUID,
|
||||
(v_item_data->>'manufacturer_id')::UUID,
|
||||
(v_item_data->>'designer_id')::UUID,
|
||||
(v_item_data->>'ride_model_id')::UUID,
|
||||
v_item_data->>'banner_image_url',
|
||||
v_item_data->>'banner_image_id',
|
||||
v_item_data->>'card_image_url',
|
||||
v_item_data->>'card_image_id'
|
||||
) RETURNING id INTO v_entity_submission_id;
|
||||
|
||||
ELSIF v_item_type IN ('manufacturer', 'operator', 'designer', 'property_owner') THEN
|
||||
INSERT INTO company_submissions (
|
||||
submission_id, name, slug, description, company_type,
|
||||
founded_year, headquarters_location, website_url,
|
||||
banner_image_url, banner_image_id, card_image_url, card_image_id
|
||||
) VALUES (
|
||||
p_submission_id,
|
||||
v_item_data->>'name',
|
||||
v_item_data->>'slug',
|
||||
v_item_data->>'description',
|
||||
v_item_type,
|
||||
(v_item_data->>'founded_year')::INTEGER,
|
||||
v_item_data->>'headquarters_location',
|
||||
v_item_data->>'website_url',
|
||||
v_item_data->>'banner_image_url',
|
||||
v_item_data->>'banner_image_id',
|
||||
v_item_data->>'card_image_url',
|
||||
v_item_data->>'card_image_id'
|
||||
) RETURNING id INTO v_entity_submission_id;
|
||||
|
||||
ELSIF v_item_type = 'ride_model' THEN
|
||||
INSERT INTO ride_model_submissions (
|
||||
submission_id, name, slug, description, manufacturer_id, category,
|
||||
banner_image_url, banner_image_id, card_image_url, card_image_id
|
||||
) VALUES (
|
||||
p_submission_id,
|
||||
v_item_data->>'name',
|
||||
v_item_data->>'slug',
|
||||
v_item_data->>'description',
|
||||
(v_item_data->>'manufacturer_id')::UUID,
|
||||
v_item_data->>'category',
|
||||
v_item_data->>'banner_image_url',
|
||||
v_item_data->>'banner_image_id',
|
||||
v_item_data->>'card_image_url',
|
||||
v_item_data->>'card_image_id'
|
||||
) RETURNING id INTO v_entity_submission_id;
|
||||
|
||||
ELSE
|
||||
RAISE EXCEPTION 'Unsupported item type: %', v_item_type;
|
||||
END IF;
|
||||
|
||||
-- Create submission_item record linking to the entity submission
|
||||
INSERT INTO submission_items (
|
||||
submission_id,
|
||||
item_type,
|
||||
action_type,
|
||||
order_index,
|
||||
depends_on,
|
||||
park_submission_id,
|
||||
ride_submission_id,
|
||||
company_submission_id,
|
||||
ride_model_submission_id
|
||||
) VALUES (
|
||||
p_submission_id,
|
||||
v_item_type,
|
||||
p_action_type,
|
||||
v_order_index,
|
||||
CASE WHEN v_depends_on IS NOT NULL THEN v_created_ids[v_depends_on + 1] ELSE NULL END,
|
||||
CASE WHEN v_item_type = 'park' THEN v_entity_submission_id ELSE NULL END,
|
||||
CASE WHEN v_item_type = 'ride' THEN v_entity_submission_id ELSE NULL END,
|
||||
CASE WHEN v_item_type IN ('manufacturer', 'operator', 'designer', 'property_owner') THEN v_entity_submission_id ELSE NULL END,
|
||||
CASE WHEN v_item_type = 'ride_model' THEN v_entity_submission_id ELSE NULL END
|
||||
) RETURNING id INTO v_submission_item_id;
|
||||
|
||||
-- Track created submission item IDs in order for dependency resolution
|
||||
v_created_ids := array_append(v_created_ids, v_submission_item_id);
|
||||
END LOOP;
|
||||
|
||||
RETURN p_submission_id;
|
||||
END;
|
||||
$function$;
|
||||
@@ -0,0 +1,227 @@
|
||||
-- Add distributed tracing support to RPC functions
|
||||
-- Adds trace_id and parent_span_id parameters for span context propagation
|
||||
|
||||
-- Update process_approval_transaction to accept trace context
|
||||
CREATE OR REPLACE FUNCTION process_approval_transaction(
|
||||
p_submission_id UUID,
|
||||
p_item_ids UUID[],
|
||||
p_moderator_id UUID,
|
||||
p_submitter_id UUID,
|
||||
p_request_id TEXT DEFAULT NULL,
|
||||
p_trace_id TEXT DEFAULT NULL,
|
||||
p_parent_span_id TEXT DEFAULT NULL
|
||||
)
|
||||
RETURNS jsonb
|
||||
LANGUAGE plpgsql
|
||||
SECURITY DEFINER
|
||||
SET search_path = public
|
||||
AS $$
|
||||
DECLARE
|
||||
v_item submission_items;
|
||||
v_approved_count INTEGER := 0;
|
||||
v_total_items INTEGER;
|
||||
v_new_status TEXT;
|
||||
v_entity_id UUID;
|
||||
v_all_items_processed BOOLEAN;
|
||||
BEGIN
|
||||
-- Log span start with trace context
|
||||
IF p_trace_id IS NOT NULL THEN
|
||||
RAISE NOTICE 'SPAN: {"spanId": "%", "traceId": "%", "parentSpanId": "%", "name": "process_approval_transaction_rpc", "kind": "INTERNAL", "startTime": %, "attributes": {"submission.id": "%", "item_count": %}}',
|
||||
gen_random_uuid()::text,
|
||||
p_trace_id,
|
||||
p_parent_span_id,
|
||||
extract(epoch from clock_timestamp()) * 1000,
|
||||
p_submission_id,
|
||||
array_length(p_item_ids, 1);
|
||||
END IF;
|
||||
|
||||
-- Get total items for this submission
|
||||
SELECT COUNT(*) INTO v_total_items
|
||||
FROM submission_items
|
||||
WHERE submission_id = p_submission_id;
|
||||
|
||||
-- Process each item
|
||||
FOREACH v_item IN ARRAY (
|
||||
SELECT ARRAY_AGG(si ORDER BY si.order_index)
|
||||
FROM submission_items si
|
||||
WHERE si.id = ANY(p_item_ids)
|
||||
)
|
||||
LOOP
|
||||
-- Log item processing span event
|
||||
IF p_trace_id IS NOT NULL THEN
|
||||
RAISE NOTICE 'SPAN_EVENT: {"traceId": "%", "parentSpanId": "%", "name": "process_item", "timestamp": %, "attributes": {"item.id": "%", "item.type": "%", "item.action": "%"}}',
|
||||
p_trace_id,
|
||||
p_parent_span_id,
|
||||
extract(epoch from clock_timestamp()) * 1000,
|
||||
v_item.id,
|
||||
v_item.item_type,
|
||||
v_item.action;
|
||||
END IF;
|
||||
|
||||
-- Create or update entity based on item type
|
||||
IF v_item.item_type = 'park' THEN
|
||||
IF v_item.action = 'create' THEN
|
||||
-- Log entity creation
|
||||
IF p_trace_id IS NOT NULL THEN
|
||||
RAISE NOTICE 'SPAN_EVENT: {"traceId": "%", "name": "create_entity_park", "timestamp": %, "attributes": {"action": "create"}}',
|
||||
p_trace_id,
|
||||
extract(epoch from clock_timestamp()) * 1000;
|
||||
END IF;
|
||||
|
||||
v_entity_id := create_entity_from_submission('park', v_item.id, p_submitter_id, p_request_id);
|
||||
ELSIF v_item.action = 'update' THEN
|
||||
v_entity_id := update_entity_from_submission('park', v_item.id, v_item.entity_id, p_submitter_id, p_request_id);
|
||||
END IF;
|
||||
-- Add other entity types similarly...
|
||||
END IF;
|
||||
|
||||
-- Update item status
|
||||
UPDATE submission_items
|
||||
SET
|
||||
status = 'approved',
|
||||
processed_at = NOW(),
|
||||
processed_by = p_moderator_id,
|
||||
entity_id = v_entity_id
|
||||
WHERE id = v_item.id;
|
||||
|
||||
v_approved_count := v_approved_count + 1;
|
||||
END LOOP;
|
||||
|
||||
-- Determine final submission status
|
||||
SELECT
|
||||
COUNT(*) = array_length(p_item_ids, 1)
|
||||
INTO v_all_items_processed
|
||||
FROM submission_items
|
||||
WHERE submission_id = p_submission_id
|
||||
AND status IN ('approved', 'rejected');
|
||||
|
||||
IF v_all_items_processed THEN
|
||||
v_new_status := 'approved';
|
||||
ELSE
|
||||
v_new_status := 'partially_approved';
|
||||
END IF;
|
||||
|
||||
-- Update submission status
|
||||
UPDATE content_submissions
|
||||
SET
|
||||
status = v_new_status,
|
||||
processed_at = CASE WHEN v_new_status = 'approved' THEN NOW() ELSE processed_at END,
|
||||
assigned_to = NULL,
|
||||
lock_expires_at = NULL
|
||||
WHERE id = p_submission_id;
|
||||
|
||||
-- Log completion
|
||||
IF p_trace_id IS NOT NULL THEN
|
||||
RAISE NOTICE 'SPAN_EVENT: {"traceId": "%", "name": "transaction_complete", "timestamp": %, "attributes": {"items_processed": %, "new_status": "%"}}',
|
||||
p_trace_id,
|
||||
extract(epoch from clock_timestamp()) * 1000,
|
||||
v_approved_count,
|
||||
v_new_status;
|
||||
END IF;
|
||||
|
||||
RETURN jsonb_build_object(
|
||||
'success', true,
|
||||
'status', v_new_status,
|
||||
'approved_count', v_approved_count,
|
||||
'total_items', v_total_items
|
||||
);
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- Update process_rejection_transaction similarly
|
||||
CREATE OR REPLACE FUNCTION process_rejection_transaction(
|
||||
p_submission_id UUID,
|
||||
p_item_ids UUID[],
|
||||
p_moderator_id UUID,
|
||||
p_rejection_reason TEXT,
|
||||
p_request_id TEXT DEFAULT NULL,
|
||||
p_trace_id TEXT DEFAULT NULL,
|
||||
p_parent_span_id TEXT DEFAULT NULL
|
||||
)
|
||||
RETURNS jsonb
|
||||
LANGUAGE plpgsql
|
||||
SECURITY DEFINER
|
||||
SET search_path = public
|
||||
AS $$
|
||||
DECLARE
|
||||
v_rejected_count INTEGER := 0;
|
||||
v_total_items INTEGER;
|
||||
v_new_status TEXT;
|
||||
v_all_items_processed BOOLEAN;
|
||||
BEGIN
|
||||
-- Log span start
|
||||
IF p_trace_id IS NOT NULL THEN
|
||||
RAISE NOTICE 'SPAN: {"spanId": "%", "traceId": "%", "parentSpanId": "%", "name": "process_rejection_transaction_rpc", "kind": "INTERNAL", "startTime": %, "attributes": {"submission.id": "%", "item_count": %}}',
|
||||
gen_random_uuid()::text,
|
||||
p_trace_id,
|
||||
p_parent_span_id,
|
||||
extract(epoch from clock_timestamp()) * 1000,
|
||||
p_submission_id,
|
||||
array_length(p_item_ids, 1);
|
||||
END IF;
|
||||
|
||||
-- Get total items
|
||||
SELECT COUNT(*) INTO v_total_items
|
||||
FROM submission_items
|
||||
WHERE submission_id = p_submission_id;
|
||||
|
||||
-- Reject items
|
||||
UPDATE submission_items
|
||||
SET
|
||||
status = 'rejected',
|
||||
rejection_reason = p_rejection_reason,
|
||||
processed_at = NOW(),
|
||||
processed_by = p_moderator_id
|
||||
WHERE id = ANY(p_item_ids);
|
||||
|
||||
GET DIAGNOSTICS v_rejected_count = ROW_COUNT;
|
||||
|
||||
-- Check if all items processed
|
||||
SELECT
|
||||
COUNT(*) = (SELECT COUNT(*) FROM submission_items WHERE submission_id = p_submission_id)
|
||||
INTO v_all_items_processed
|
||||
FROM submission_items
|
||||
WHERE submission_id = p_submission_id
|
||||
AND status IN ('approved', 'rejected');
|
||||
|
||||
IF v_all_items_processed THEN
|
||||
-- Check if any items were approved
|
||||
SELECT EXISTS(
|
||||
SELECT 1 FROM submission_items
|
||||
WHERE submission_id = p_submission_id AND status = 'approved'
|
||||
) INTO v_all_items_processed;
|
||||
|
||||
v_new_status := CASE
|
||||
WHEN v_all_items_processed THEN 'partially_approved'
|
||||
ELSE 'rejected'
|
||||
END;
|
||||
ELSE
|
||||
v_new_status := 'partially_approved';
|
||||
END IF;
|
||||
|
||||
-- Update submission
|
||||
UPDATE content_submissions
|
||||
SET
|
||||
status = v_new_status,
|
||||
processed_at = CASE WHEN v_new_status = 'rejected' THEN NOW() ELSE processed_at END,
|
||||
assigned_to = NULL,
|
||||
lock_expires_at = NULL
|
||||
WHERE id = p_submission_id;
|
||||
|
||||
-- Log completion
|
||||
IF p_trace_id IS NOT NULL THEN
|
||||
RAISE NOTICE 'SPAN_EVENT: {"traceId": "%", "name": "rejection_complete", "timestamp": %, "attributes": {"items_rejected": %, "new_status": "%"}}',
|
||||
p_trace_id,
|
||||
extract(epoch from clock_timestamp()) * 1000,
|
||||
v_rejected_count,
|
||||
v_new_status;
|
||||
END IF;
|
||||
|
||||
RETURN jsonb_build_object(
|
||||
'success', true,
|
||||
'status', v_new_status,
|
||||
'rejected_count', v_rejected_count,
|
||||
'total_items', v_total_items
|
||||
);
|
||||
END;
|
||||
$$;
|
||||
Reference in New Issue
Block a user