Files
thrilltrack-explorer/supabase/functions/process-selective-approval/index.ts
gpt-engineer-app[bot] d18632c2b2 Improve moderation edge flow and timeout handling
- Add early logging and health check to process-selective-approval edge function
- Implement idempotency check with timeout to avoid edge timeouts
- Expose health endpoint for connectivity diagnostics
- Increase client moderation action timeout from 30s to 60s
- Update moderation actions hook to accommodate longer timeouts
2025-11-12 13:15:54 +00:00

302 lines
8.3 KiB
TypeScript

import { createEdgeFunction } from '../_shared/edgeFunctionWrapper.ts';
import {
addSpanEvent,
setSpanAttributes,
getSpanContext,
startSpan,
endSpan,
} from '../_shared/logger.ts';
import { toError } from '../_shared/errorFormatter.ts';
import {
validateApprovalRequest,
} from '../_shared/submissionValidation.ts';
import { ValidationError } from '../_shared/typeValidation.ts';
// Main handler function
const handler = async (req: Request, context: { supabase: any; user: any; span: any; requestId: string }) => {
const { supabase, user, span: rootSpan, requestId } = context;
// Early logging - confirms request reached handler
addSpanEvent(rootSpan, 'handler_entry', {
requestId,
userId: user.id,
timestamp: Date.now()
});
setSpanAttributes(rootSpan, {
'user.id': user.id,
'function.name': 'process-selective-approval'
});
// Health check endpoint
if (req.url.includes('/health')) {
addSpanEvent(rootSpan, 'health_check_start');
const { data, error } = await supabase
.from('content_submissions')
.select('count')
.limit(1);
addSpanEvent(rootSpan, 'health_check_complete', {
dbConnected: !error,
error: error?.message
});
return new Response(JSON.stringify({
status: 'ok',
dbConnected: !error,
timestamp: Date.now(),
error: error?.message
}), {
headers: { 'Content-Type': 'application/json' },
status: error ? 500 : 200
});
}
// STEP 1: Parse and validate request
addSpanEvent(rootSpan, 'validation_start');
let submissionId: string;
let itemIds: string[];
try {
const body = await req.json();
const validated = validateApprovalRequest(body, requestId);
submissionId = validated.submissionId;
itemIds = validated.itemIds;
} catch (error) {
if (error instanceof ValidationError) {
addSpanEvent(rootSpan, 'validation_failed', {
field: error.field,
expected: error.expected,
received: error.received,
});
throw error; // Will be caught by wrapper
}
throw error;
}
const idempotencyKey = req.headers.get('x-idempotency-key');
if (!idempotencyKey) {
addSpanEvent(rootSpan, 'validation_failed', { reason: 'missing_idempotency_key' });
throw new ValidationError('idempotency_key', 'Missing X-Idempotency-Key header', 'string', 'undefined');
}
setSpanAttributes(rootSpan, {
'submission.id': submissionId,
'submission.item_count': itemIds.length,
'idempotency.key': idempotencyKey,
});
addSpanEvent(rootSpan, 'validation_complete');
// STEP 2: Idempotency check with timeout
addSpanEvent(rootSpan, 'idempotency_check_start');
const idempotencyCheckPromise = supabase
.from('submission_idempotency_keys')
.select('*')
.eq('idempotency_key', idempotencyKey)
.single();
// Add 5 second timeout for idempotency check
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Idempotency check timed out after 5s')), 5000)
);
let existingKey;
try {
const result = await Promise.race([
idempotencyCheckPromise,
timeoutPromise
]) as any;
existingKey = result.data;
} catch (timeoutError: any) {
addSpanEvent(rootSpan, 'idempotency_check_timeout', { error: timeoutError.message });
throw new Error(`Database query timeout: ${timeoutError.message}`);
}
addSpanEvent(rootSpan, 'idempotency_check_complete', {
foundKey: !!existingKey,
status: existingKey?.status
});
if (existingKey?.status === 'completed') {
addSpanEvent(rootSpan, 'idempotency_cache_hit');
setSpanAttributes(rootSpan, { 'cache.hit': true });
return new Response(
JSON.stringify(existingKey.result_data),
{
status: 200,
headers: { 'X-Cache-Status': 'HIT' }
}
);
}
// STEP 3: Fetch submission to get submitter_id
const { data: submission, error: submissionError } = await supabase
.from('content_submissions')
.select('user_id, status, assigned_to')
.eq('id', submissionId)
.single();
if (submissionError || !submission) {
addSpanEvent(rootSpan, 'submission_fetch_failed', { error: submissionError?.message });
throw new Error('Submission not found');
}
// STEP 4: Verify moderator can approve this submission
if (submission.assigned_to && submission.assigned_to !== user.id) {
throw new Error('Submission is locked by another moderator');
}
if (!['pending', 'partially_approved'].includes(submission.status)) {
throw new Error('Submission already processed');
}
// STEP 5: Register idempotency key as processing (atomic upsert)
if (!existingKey) {
const { data: insertedKey, error: idempotencyError } = await supabase
.from('submission_idempotency_keys')
.insert({
idempotency_key: idempotencyKey,
submission_id: submissionId,
moderator_id: user.id,
item_ids: itemIds,
status: 'processing'
})
.select()
.single();
if (idempotencyError && idempotencyError.code === '23505') {
throw new Error('Another moderator is processing this submission');
}
if (idempotencyError) {
throw toError(idempotencyError);
}
}
// Create child span for RPC transaction
const rpcSpan = startSpan(
'process_approval_transaction',
'DATABASE',
getSpanContext(rootSpan),
{
'db.operation': 'rpc',
'db.function': 'process_approval_transaction',
'submission.id': submissionId,
'submission.item_count': itemIds.length,
}
);
addSpanEvent(rpcSpan, 'rpc_call_start');
// STEP 6: Call RPC function with deadlock retry logic
let retryCount = 0;
const MAX_DEADLOCK_RETRIES = 3;
let result: any = null;
let rpcError: any = null;
while (retryCount <= MAX_DEADLOCK_RETRIES) {
const { data, error } = await supabase.rpc(
'process_approval_transaction',
{
p_submission_id: submissionId,
p_item_ids: itemIds,
p_moderator_id: user.id,
p_submitter_id: submission.user_id,
p_request_id: requestId,
p_trace_id: rootSpan.traceId,
p_parent_span_id: rpcSpan.spanId
}
);
result = data;
rpcError = error;
if (!rpcError) {
addSpanEvent(rpcSpan, 'rpc_call_success', {
'result.status': data?.status,
'items.processed': itemIds.length,
});
break;
}
// Check for deadlock (40P01) or serialization failure (40001)
if (rpcError.code === '40P01' || rpcError.code === '40001') {
retryCount++;
if (retryCount > MAX_DEADLOCK_RETRIES) {
addSpanEvent(rpcSpan, 'max_retries_exceeded', { attempt: retryCount });
break;
}
const backoffMs = 100 * Math.pow(2, retryCount);
addSpanEvent(rpcSpan, 'deadlock_retry', { attempt: retryCount, backoffMs });
await new Promise(r => setTimeout(r, backoffMs));
continue;
}
// Non-retryable error
addSpanEvent(rpcSpan, 'rpc_call_failed', {
error: rpcError.message,
errorCode: rpcError.code
});
break;
}
if (rpcError) {
endSpan(rpcSpan, 'error', rpcError);
// Update idempotency key to failed
try {
await supabase
.from('submission_idempotency_keys')
.update({
status: 'failed',
error_message: rpcError.message,
completed_at: new Date().toISOString()
})
.eq('idempotency_key', idempotencyKey);
} catch (updateError) {
// Non-blocking
}
throw toError(rpcError);
}
// RPC succeeded
endSpan(rpcSpan, 'ok');
setSpanAttributes(rootSpan, {
'result.status': result?.status,
'result.final_status': result?.status,
'retries': retryCount,
});
// STEP 7: Success - update idempotency key
try {
await supabase
.from('submission_idempotency_keys')
.update({
status: 'completed',
result_data: result,
completed_at: new Date().toISOString()
})
.eq('idempotency_key', idempotencyKey);
} catch (updateError) {
// Non-blocking - transaction succeeded, so continue with success response
}
return result;
};
// Create edge function with automatic error handling, CORS, auth, and logging
createEdgeFunction(
{
name: 'process-selective-approval',
requireAuth: true,
corsEnabled: true,
enableTracing: true,
rateLimitTier: 'moderate'
},
handler
);