import { serve } from 'https://deno.land/std@0.190.0/http/server.ts'; import { createEdgeFunction } from '../_shared/edgeFunctionWrapper.ts'; import { corsHeadersWithTracing } from '../_shared/cors.ts'; import { addSpanEvent, setSpanAttributes, getSpanContext, startSpan, endSpan, } from '../_shared/logger.ts'; import { toError } from '../_shared/errorFormatter.ts'; import { validateApprovalRequest, } from '../_shared/submissionValidation.ts'; import { ValidationError } from '../_shared/typeValidation.ts'; // Main handler function const handler = async (req: Request, context: { supabase: any; user: any; span: any; requestId: string }) => { const { supabase, user, span: rootSpan, requestId } = context; // Early logging - confirms request reached handler addSpanEvent(rootSpan, 'handler_entry', { requestId, userId: user.id, timestamp: Date.now() }); setSpanAttributes(rootSpan, { 'user.id': user.id, 'function.name': 'process-selective-approval' }); // Health check endpoint if (req.url.includes('/health')) { addSpanEvent(rootSpan, 'health_check_start'); const { data, error } = await supabase .from('content_submissions') .select('count') .limit(1); addSpanEvent(rootSpan, 'health_check_complete', { dbConnected: !error, error: error?.message }); return new Response(JSON.stringify({ status: 'ok', dbConnected: !error, timestamp: Date.now(), error: error?.message }), { headers: { 'Content-Type': 'application/json' }, status: error ? 500 : 200 }); } // STEP 1: Parse and validate request addSpanEvent(rootSpan, 'validation_start'); let submissionId: string; let itemIds: string[]; try { const body = await req.json(); const validated = validateApprovalRequest(body, requestId); submissionId = validated.submissionId; itemIds = validated.itemIds; } catch (error) { if (error instanceof ValidationError) { addSpanEvent(rootSpan, 'validation_failed', { field: error.field, expected: error.expected, received: error.received, }); throw error; // Will be caught by wrapper } throw error; } const idempotencyKey = req.headers.get('x-idempotency-key'); if (!idempotencyKey) { addSpanEvent(rootSpan, 'validation_failed', { reason: 'missing_idempotency_key' }); throw new ValidationError('idempotency_key', 'Missing X-Idempotency-Key header', 'string', 'undefined'); } setSpanAttributes(rootSpan, { 'submission.id': submissionId, 'submission.item_count': itemIds.length, 'idempotency.key': idempotencyKey, }); addSpanEvent(rootSpan, 'validation_complete'); // STEP 2: Idempotency check with timeout addSpanEvent(rootSpan, 'idempotency_check_start'); const idempotencyCheckPromise = supabase .from('submission_idempotency_keys') .select('*') .eq('idempotency_key', idempotencyKey) .single(); // Add 5 second timeout for idempotency check const timeoutPromise = new Promise((_, reject) => setTimeout(() => reject(new Error('Idempotency check timed out after 5s')), 5000) ); let existingKey; try { const result = await Promise.race([ idempotencyCheckPromise, timeoutPromise ]) as any; existingKey = result.data; } catch (timeoutError: any) { addSpanEvent(rootSpan, 'idempotency_check_timeout', { error: timeoutError.message }); throw new Error(`Database query timeout: ${timeoutError.message}`); } addSpanEvent(rootSpan, 'idempotency_check_complete', { foundKey: !!existingKey, status: existingKey?.status }); if (existingKey?.status === 'completed') { addSpanEvent(rootSpan, 'idempotency_cache_hit'); setSpanAttributes(rootSpan, { 'cache.hit': true }); return new Response( JSON.stringify(existingKey.result_data), { status: 200, headers: { 'X-Cache-Status': 'HIT' } } ); } // STEP 3: Fetch submission to get submitter_id const { data: submission, error: submissionError } = await supabase .from('content_submissions') .select('user_id, status, assigned_to') .eq('id', submissionId) .single(); if (submissionError || !submission) { addSpanEvent(rootSpan, 'submission_fetch_failed', { error: submissionError?.message }); throw new Error('Submission not found'); } // STEP 4: Verify moderator can approve this submission if (submission.assigned_to && submission.assigned_to !== user.id) { throw new Error('Submission is locked by another moderator'); } if (!['pending', 'partially_approved'].includes(submission.status)) { throw new Error('Submission already processed'); } // STEP 5: Register idempotency key as processing (atomic upsert) if (!existingKey) { const { data: insertedKey, error: idempotencyError } = await supabase .from('submission_idempotency_keys') .insert({ idempotency_key: idempotencyKey, submission_id: submissionId, moderator_id: user.id, item_ids: itemIds, status: 'processing' }) .select() .single(); if (idempotencyError && idempotencyError.code === '23505') { throw new Error('Another moderator is processing this submission'); } if (idempotencyError) { throw toError(idempotencyError); } } // Create child span for RPC transaction const rpcSpan = startSpan( 'process_approval_transaction', 'DATABASE', getSpanContext(rootSpan), { 'db.operation': 'rpc', 'db.function': 'process_approval_transaction', 'submission.id': submissionId, 'submission.item_count': itemIds.length, } ); addSpanEvent(rpcSpan, 'rpc_call_start'); // STEP 6: Call RPC function with deadlock retry logic let retryCount = 0; const MAX_DEADLOCK_RETRIES = 3; let result: any = null; let rpcError: any = null; while (retryCount <= MAX_DEADLOCK_RETRIES) { const { data, error } = await supabase.rpc( 'process_approval_transaction', { p_submission_id: submissionId, p_item_ids: itemIds, p_moderator_id: user.id, p_submitter_id: submission.user_id, p_request_id: requestId, p_trace_id: rootSpan.traceId, p_parent_span_id: rpcSpan.spanId } ); result = data; rpcError = error; if (!rpcError) { addSpanEvent(rpcSpan, 'rpc_call_success', { 'result.status': data?.status, 'items.processed': itemIds.length, }); break; } // Check for deadlock (40P01) or serialization failure (40001) if (rpcError.code === '40P01' || rpcError.code === '40001') { retryCount++; if (retryCount > MAX_DEADLOCK_RETRIES) { addSpanEvent(rpcSpan, 'max_retries_exceeded', { attempt: retryCount }); break; } const backoffMs = 100 * Math.pow(2, retryCount); addSpanEvent(rpcSpan, 'deadlock_retry', { attempt: retryCount, backoffMs }); await new Promise(r => setTimeout(r, backoffMs)); continue; } // Non-retryable error addSpanEvent(rpcSpan, 'rpc_call_failed', { error: rpcError.message, errorCode: rpcError.code }); break; } if (rpcError) { endSpan(rpcSpan, 'error', rpcError); // Update idempotency key to failed try { await supabase .from('submission_idempotency_keys') .update({ status: 'failed', error_message: rpcError.message, completed_at: new Date().toISOString() }) .eq('idempotency_key', idempotencyKey); } catch (updateError) { // Non-blocking } throw toError(rpcError); } // RPC succeeded endSpan(rpcSpan, 'ok'); setSpanAttributes(rootSpan, { 'result.status': result?.status, 'result.final_status': result?.status, 'retries': retryCount, }); // STEP 7: Success - update idempotency key try { await supabase .from('submission_idempotency_keys') .update({ status: 'completed', result_data: result, completed_at: new Date().toISOString() }) .eq('idempotency_key', idempotencyKey); } catch (updateError) { // Non-blocking - transaction succeeded, so continue with success response } return result; }; // Create edge function with automatic error handling, CORS, auth, and logging serve(createEdgeFunction( { name: 'process-selective-approval', requireAuth: true, corsHeaders: corsHeadersWithTracing, }, handler ));