Files
thrilltrack-explorer/supabase/functions/process-selective-approval/index.ts
gpt-engineer-app[bot] afe7a93f69 Migrate complex functions in batches
Batch 1 of Phase 2: migrate 3-4 edge functions to use createEdgeFunction wrapper (process-selective-approval, process-selective-rejection, rate-limit-metrics) to enable automatic error logging, CORS, auth, and reduced boilerplate; preserve existing logic where applicable and prepare for subsequent batches.
2025-11-11 19:17:21 +00:00

248 lines
6.9 KiB
TypeScript

import { createEdgeFunction } from '../_shared/edgeFunctionWrapper.ts';
import {
addSpanEvent,
setSpanAttributes,
getSpanContext,
startSpan,
endSpan,
} from '../_shared/logger.ts';
import { toError } from '../_shared/errorFormatter.ts';
import {
validateApprovalRequest,
} from '../_shared/submissionValidation.ts';
import { ValidationError } from '../_shared/typeValidation.ts';
// Main handler function
const handler = async (req: Request, context: { supabase: any; user: any; span: any; requestId: string }) => {
const { supabase, user, span: rootSpan, requestId } = context;
setSpanAttributes(rootSpan, {
'user.id': user.id,
'function.name': 'process-selective-approval'
});
// STEP 1: Parse and validate request
addSpanEvent(rootSpan, 'validation_start');
let submissionId: string;
let itemIds: string[];
try {
const body = await req.json();
const validated = validateApprovalRequest(body, requestId);
submissionId = validated.submissionId;
itemIds = validated.itemIds;
} catch (error) {
if (error instanceof ValidationError) {
addSpanEvent(rootSpan, 'validation_failed', {
field: error.field,
expected: error.expected,
received: error.received,
});
throw error; // Will be caught by wrapper
}
throw error;
}
const idempotencyKey = req.headers.get('x-idempotency-key');
if (!idempotencyKey) {
addSpanEvent(rootSpan, 'validation_failed', { reason: 'missing_idempotency_key' });
throw new ValidationError('idempotency_key', 'Missing X-Idempotency-Key header', 'string', 'undefined');
}
setSpanAttributes(rootSpan, {
'submission.id': submissionId,
'submission.item_count': itemIds.length,
'idempotency.key': idempotencyKey,
});
addSpanEvent(rootSpan, 'validation_complete');
// STEP 2: Idempotency check
addSpanEvent(rootSpan, 'idempotency_check_start');
const { data: existingKey } = await supabase
.from('submission_idempotency_keys')
.select('*')
.eq('idempotency_key', idempotencyKey)
.single();
if (existingKey?.status === 'completed') {
addSpanEvent(rootSpan, 'idempotency_cache_hit');
setSpanAttributes(rootSpan, { 'cache.hit': true });
return new Response(
JSON.stringify(existingKey.result_data),
{
status: 200,
headers: { 'X-Cache-Status': 'HIT' }
}
);
}
// STEP 3: Fetch submission to get submitter_id
const { data: submission, error: submissionError } = await supabase
.from('content_submissions')
.select('user_id, status, assigned_to')
.eq('id', submissionId)
.single();
if (submissionError || !submission) {
addSpanEvent(rootSpan, 'submission_fetch_failed', { error: submissionError?.message });
throw new Error('Submission not found');
}
// STEP 4: Verify moderator can approve this submission
if (submission.assigned_to && submission.assigned_to !== user.id) {
throw new Error('Submission is locked by another moderator');
}
if (!['pending', 'partially_approved'].includes(submission.status)) {
throw new Error('Submission already processed');
}
// STEP 5: Register idempotency key as processing (atomic upsert)
if (!existingKey) {
const { data: insertedKey, error: idempotencyError } = await supabase
.from('submission_idempotency_keys')
.insert({
idempotency_key: idempotencyKey,
submission_id: submissionId,
moderator_id: user.id,
item_ids: itemIds,
status: 'processing'
})
.select()
.single();
if (idempotencyError && idempotencyError.code === '23505') {
throw new Error('Another moderator is processing this submission');
}
if (idempotencyError) {
throw toError(idempotencyError);
}
}
// Create child span for RPC transaction
const rpcSpan = startSpan(
'process_approval_transaction',
'DATABASE',
getSpanContext(rootSpan),
{
'db.operation': 'rpc',
'db.function': 'process_approval_transaction',
'submission.id': submissionId,
'submission.item_count': itemIds.length,
}
);
addSpanEvent(rpcSpan, 'rpc_call_start');
// STEP 6: Call RPC function with deadlock retry logic
let retryCount = 0;
const MAX_DEADLOCK_RETRIES = 3;
let result: any = null;
let rpcError: any = null;
while (retryCount <= MAX_DEADLOCK_RETRIES) {
const { data, error } = await supabase.rpc(
'process_approval_transaction',
{
p_submission_id: submissionId,
p_item_ids: itemIds,
p_moderator_id: user.id,
p_submitter_id: submission.user_id,
p_request_id: requestId,
p_trace_id: rootSpan.traceId,
p_parent_span_id: rpcSpan.spanId
}
);
result = data;
rpcError = error;
if (!rpcError) {
addSpanEvent(rpcSpan, 'rpc_call_success', {
'result.status': data?.status,
'items.processed': itemIds.length,
});
break;
}
// Check for deadlock (40P01) or serialization failure (40001)
if (rpcError.code === '40P01' || rpcError.code === '40001') {
retryCount++;
if (retryCount > MAX_DEADLOCK_RETRIES) {
addSpanEvent(rpcSpan, 'max_retries_exceeded', { attempt: retryCount });
break;
}
const backoffMs = 100 * Math.pow(2, retryCount);
addSpanEvent(rpcSpan, 'deadlock_retry', { attempt: retryCount, backoffMs });
await new Promise(r => setTimeout(r, backoffMs));
continue;
}
// Non-retryable error
addSpanEvent(rpcSpan, 'rpc_call_failed', {
error: rpcError.message,
errorCode: rpcError.code
});
break;
}
if (rpcError) {
endSpan(rpcSpan, 'error', rpcError);
// Update idempotency key to failed
try {
await supabase
.from('submission_idempotency_keys')
.update({
status: 'failed',
error_message: rpcError.message,
completed_at: new Date().toISOString()
})
.eq('idempotency_key', idempotencyKey);
} catch (updateError) {
// Non-blocking
}
throw toError(rpcError);
}
// RPC succeeded
endSpan(rpcSpan, 'ok');
setSpanAttributes(rootSpan, {
'result.status': result?.status,
'result.final_status': result?.status,
'retries': retryCount,
});
// STEP 7: Success - update idempotency key
try {
await supabase
.from('submission_idempotency_keys')
.update({
status: 'completed',
result_data: result,
completed_at: new Date().toISOString()
})
.eq('idempotency_key', idempotencyKey);
} catch (updateError) {
// Non-blocking - transaction succeeded, so continue with success response
}
return result;
};
// Create edge function with automatic error handling, CORS, auth, and logging
createEdgeFunction(
{
name: 'process-selective-approval',
requireAuth: true,
corsEnabled: true,
enableTracing: true,
rateLimitTier: 'moderate'
},
handler
);