import { serve } from 'https://deno.land/std@0.168.0/http/server.ts'; import { createClient } from 'https://esm.sh/@supabase/supabase-js@2.57.4'; import { corsHeaders } from './cors.ts'; import { rateLimiters, withRateLimit } from '../_shared/rateLimiter.ts'; import { edgeLogger, startSpan, endSpan, addSpanEvent, setSpanAttributes, getSpanContext, logSpan, extractSpanContextFromHeaders, type Span } from '../_shared/logger.ts'; const SUPABASE_URL = Deno.env.get('SUPABASE_URL') || 'https://api.thrillwiki.com'; const SUPABASE_ANON_KEY = Deno.env.get('SUPABASE_ANON_KEY')!; interface RejectionRequest { submissionId: string; itemIds: string[]; rejectionReason: string; idempotencyKey: string; } // Main handler function const handler = async (req: Request) => { // Handle CORS preflight requests if (req.method === 'OPTIONS') { return new Response(null, { status: 204, headers: corsHeaders }); } // Extract parent span context from headers (if present) const parentSpanContext = extractSpanContextFromHeaders(req.headers); // Create root span for this edge function invocation const rootSpan = startSpan( 'process-selective-rejection', 'SERVER', parentSpanContext, { 'http.method': 'POST', 'function.name': 'process-selective-rejection', } ); const requestId = rootSpan.spanId; try { // STEP 1: Authentication addSpanEvent(rootSpan, 'authentication_start'); const authHeader = req.headers.get('Authorization'); if (!authHeader) { addSpanEvent(rootSpan, 'authentication_failed', { reason: 'missing_header' }); endSpan(rootSpan, 'error'); logSpan(rootSpan); return new Response( JSON.stringify({ error: 'Missing Authorization header' }), { status: 401, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } const supabase = createClient(SUPABASE_URL, SUPABASE_ANON_KEY, { global: { headers: { Authorization: authHeader } } }); const { data: { user }, error: authError } = await supabase.auth.getUser(); if (authError || !user) { addSpanEvent(rootSpan, 'authentication_failed', { error: authError?.message }); edgeLogger.warn('Authentication failed', { requestId, error: authError?.message, action: 'process_rejection' }); endSpan(rootSpan, 'error', authError || new Error('Unauthorized')); logSpan(rootSpan); return new Response( JSON.stringify({ error: 'Unauthorized' }), { status: 401, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } setSpanAttributes(rootSpan, { 'user.id': user.id }); addSpanEvent(rootSpan, 'authentication_success'); edgeLogger.info('Rejection request received', { requestId, moderatorId: user.id, action: 'process_rejection' }); // STEP 2: Parse request addSpanEvent(rootSpan, 'validation_start'); const body: RejectionRequest = await req.json(); const { submissionId, itemIds, rejectionReason, idempotencyKey } = body; if (!submissionId || !itemIds || itemIds.length === 0 || !rejectionReason) { addSpanEvent(rootSpan, 'validation_failed', { hasSubmissionId: !!submissionId, hasItemIds: !!itemIds, itemCount: itemIds?.length || 0, hasReason: !!rejectionReason, }); edgeLogger.warn('Invalid request payload', { requestId, hasSubmissionId: !!submissionId, hasItemIds: !!itemIds, itemCount: itemIds?.length || 0, hasReason: !!rejectionReason, action: 'process_rejection' }); endSpan(rootSpan, 'error'); logSpan(rootSpan); return new Response( JSON.stringify({ error: 'Missing required fields: submissionId, itemIds, rejectionReason' }), { status: 400, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } setSpanAttributes(rootSpan, { 'submission.id': submissionId, 'submission.item_count': itemIds.length, 'idempotency.key': idempotencyKey, }); addSpanEvent(rootSpan, 'validation_complete'); edgeLogger.info('Request validated', { requestId, submissionId, itemCount: itemIds.length, action: 'process_rejection' }); // STEP 3: Idempotency check addSpanEvent(rootSpan, 'idempotency_check_start'); const { data: existingKey } = await supabase .from('submission_idempotency_keys') .select('*') .eq('idempotency_key', idempotencyKey) .single(); if (existingKey?.status === 'completed') { addSpanEvent(rootSpan, 'idempotency_cache_hit'); setSpanAttributes(rootSpan, { 'cache.hit': true }); edgeLogger.info('Idempotency cache hit', { requestId, idempotencyKey, cached: true, action: 'process_rejection' }); endSpan(rootSpan, 'ok'); logSpan(rootSpan); return new Response( JSON.stringify(existingKey.result_data), { status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json', 'X-Cache-Status': 'HIT' } } ); } // STEP 4: Fetch submission to get submitter_id const { data: submission, error: submissionError } = await supabase .from('content_submissions') .select('user_id, status, assigned_to') .eq('id', submissionId) .single(); if (submissionError || !submission) { addSpanEvent(rootSpan, 'submission_fetch_failed', { error: submissionError?.message }); edgeLogger.error('Submission not found', { requestId, submissionId, error: submissionError?.message, action: 'process_rejection' }); endSpan(rootSpan, 'error', submissionError || new Error('Submission not found')); logSpan(rootSpan); return new Response( JSON.stringify({ error: 'Submission not found' }), { status: 404, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } // STEP 5: Verify moderator can reject this submission if (submission.assigned_to && submission.assigned_to !== user.id) { edgeLogger.warn('Lock conflict', { requestId, submissionId, lockedBy: submission.assigned_to, attemptedBy: user.id, action: 'process_rejection' }); return new Response( JSON.stringify({ error: 'Submission is locked by another moderator' }), { status: 409, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } if (!['pending', 'partially_approved'].includes(submission.status)) { edgeLogger.warn('Invalid submission status', { requestId, submissionId, currentStatus: submission.status, expectedStatuses: ['pending', 'partially_approved'], action: 'process_rejection' }); return new Response( JSON.stringify({ error: 'Submission already processed' }), { status: 400, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } // STEP 6: Register idempotency key as processing if (!existingKey) { await supabase.from('submission_idempotency_keys').insert({ idempotency_key: idempotencyKey, submission_id: submissionId, moderator_id: user.id, status: 'processing' }); } // Create child span for RPC transaction const rpcSpan = startSpan( 'process_rejection_transaction', 'DATABASE', getSpanContext(rootSpan), { 'db.operation': 'rpc', 'db.function': 'process_rejection_transaction', 'submission.id': submissionId, 'submission.item_count': itemIds.length, } ); addSpanEvent(rpcSpan, 'rpc_call_start'); edgeLogger.info('Calling rejection transaction RPC', { requestId, submissionId, itemCount: itemIds.length, action: 'process_rejection' }); // ============================================================================ // STEP 7: Call RPC function with deadlock retry logic // ============================================================================ let retryCount = 0; const MAX_DEADLOCK_RETRIES = 3; let result: any = null; let rpcError: any = null; while (retryCount <= MAX_DEADLOCK_RETRIES) { const { data, error } = await supabase.rpc( 'process_rejection_transaction', { p_submission_id: submissionId, p_item_ids: itemIds, p_moderator_id: user.id, p_rejection_reason: rejectionReason, p_request_id: requestId, p_trace_id: rootSpan.traceId, p_parent_span_id: rpcSpan.spanId } ); result = data; rpcError = error; if (!rpcError) { // Success! addSpanEvent(rpcSpan, 'rpc_call_success', { 'result.status': data?.status, 'items.processed': itemIds.length, }); break; } // Check for deadlock (40P01) or serialization failure (40001) if (rpcError.code === '40P01' || rpcError.code === '40001') { retryCount++; if (retryCount > MAX_DEADLOCK_RETRIES) { addSpanEvent(rpcSpan, 'max_retries_exceeded', { attempt: retryCount }); edgeLogger.error('Max deadlock retries exceeded', { requestId, submissionId, attempt: retryCount, action: 'process_rejection' }); break; } const backoffMs = 100 * Math.pow(2, retryCount); addSpanEvent(rpcSpan, 'deadlock_retry', { attempt: retryCount, backoffMs }); edgeLogger.warn('Deadlock detected, retrying', { requestId, attempt: retryCount, maxAttempts: MAX_DEADLOCK_RETRIES, backoffMs, action: 'process_rejection' }); await new Promise(r => setTimeout(r, backoffMs)); continue; } // Non-retryable error, break immediately addSpanEvent(rpcSpan, 'rpc_call_failed', { error: rpcError.message, errorCode: rpcError.code }); break; } if (rpcError) { // Transaction failed - EVERYTHING rolled back automatically by PostgreSQL endSpan(rpcSpan, 'error', rpcError); logSpan(rpcSpan); edgeLogger.error('Transaction failed', { requestId, duration: rpcSpan.duration, submissionId, error: rpcError.message, errorCode: rpcError.code, retries: retryCount, action: 'process_rejection' }); // Update idempotency key to failed try { await supabase .from('submission_idempotency_keys') .update({ status: 'failed', error_message: rpcError.message, completed_at: new Date().toISOString() }) .eq('idempotency_key', idempotencyKey); } catch (updateError) { edgeLogger.warn('Failed to update idempotency key', { requestId, idempotencyKey, status: 'failed', error: updateError instanceof Error ? updateError.message : String(updateError), action: 'process_rejection' }); // Non-blocking - continue with error response even if idempotency update fails } endSpan(rootSpan, 'error', rpcError); logSpan(rootSpan); return new Response( JSON.stringify({ error: 'Rejection transaction failed', message: rpcError.message, details: rpcError.details, retries: retryCount }), { status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } // RPC succeeded endSpan(rpcSpan, 'ok'); logSpan(rpcSpan); setSpanAttributes(rootSpan, { 'result.status': result?.status, 'result.final_status': result?.status, 'retries': retryCount, }); edgeLogger.info('Transaction completed successfully', { requestId, duration: rpcSpan.duration, submissionId, itemCount: itemIds.length, retries: retryCount, newStatus: result?.status, action: 'process_rejection' }); // STEP 8: Success - update idempotency key try { await supabase .from('submission_idempotency_keys') .update({ status: 'completed', result_data: result, completed_at: new Date().toISOString() }) .eq('idempotency_key', idempotencyKey); } catch (updateError) { edgeLogger.warn('Failed to update idempotency key', { requestId, idempotencyKey, status: 'completed', error: updateError instanceof Error ? updateError.message : String(updateError), action: 'process_rejection' }); // Non-blocking - transaction succeeded, so continue with success response } endSpan(rootSpan, 'ok'); logSpan(rootSpan); return new Response( JSON.stringify(result), { status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json', 'X-Request-Id': requestId } } ); } catch (error) { endSpan(rootSpan, 'error', error instanceof Error ? error : new Error(String(error))); logSpan(rootSpan); edgeLogger.error('Unexpected error', { requestId, duration: rootSpan.duration, error: error instanceof Error ? error.message : String(error), stack: error instanceof Error ? error.stack : undefined, action: 'process_rejection' }); return new Response( JSON.stringify({ error: 'Internal server error', message: error instanceof Error ? error.message : 'Unknown error' }), { status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } }; // Apply rate limiting: 10 requests per minute per IP (standard tier) serve(withRateLimit(handler, rateLimiters.standard, corsHeaders));