import { serve } from 'https://deno.land/std@0.168.0/http/server.ts'; import { createClient } from 'https://esm.sh/@supabase/supabase-js@2.57.4'; import { corsHeaders } from './cors.ts'; import { rateLimiters, withRateLimit } from '../_shared/rateLimiter.ts'; import { edgeLogger, startRequest, endRequest, type RequestTracking } from '../_shared/logger.ts'; const SUPABASE_URL = Deno.env.get('SUPABASE_URL') || 'https://api.thrillwiki.com'; const SUPABASE_ANON_KEY = Deno.env.get('SUPABASE_ANON_KEY')!; interface RejectionRequest { submissionId: string; itemIds: string[]; rejectionReason: string; idempotencyKey: string; } // Main handler function const handler = async (req: Request) => { // Handle CORS preflight requests if (req.method === 'OPTIONS') { return new Response(null, { status: 204, headers: corsHeaders }); } // Start request tracking const tracking: RequestTracking = startRequest(); const requestId = tracking.requestId; try { // STEP 1: Authentication const authHeader = req.headers.get('Authorization'); if (!authHeader) { return new Response( JSON.stringify({ error: 'Missing Authorization header' }), { status: 401, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } const supabase = createClient(SUPABASE_URL, SUPABASE_ANON_KEY, { global: { headers: { Authorization: authHeader } } }); const { data: { user }, error: authError } = await supabase.auth.getUser(); if (authError || !user) { edgeLogger.warn('Authentication failed', { requestId, error: authError?.message, action: 'process_rejection' }); return new Response( JSON.stringify({ error: 'Unauthorized' }), { status: 401, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } edgeLogger.info('Rejection request received', { requestId, moderatorId: user.id, action: 'process_rejection' }); // STEP 2: Parse request const body: RejectionRequest = await req.json(); const { submissionId, itemIds, rejectionReason, idempotencyKey } = body; if (!submissionId || !itemIds || itemIds.length === 0 || !rejectionReason) { edgeLogger.warn('Invalid request payload', { requestId, hasSubmissionId: !!submissionId, hasItemIds: !!itemIds, itemCount: itemIds?.length || 0, hasReason: !!rejectionReason, action: 'process_rejection' }); return new Response( JSON.stringify({ error: 'Missing required fields: submissionId, itemIds, rejectionReason' }), { status: 400, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } edgeLogger.info('Request validated', { requestId, submissionId, itemCount: itemIds.length, action: 'process_rejection' }); // STEP 3: Idempotency check const { data: existingKey } = await supabase .from('submission_idempotency_keys') .select('*') .eq('idempotency_key', idempotencyKey) .single(); if (existingKey?.status === 'completed') { edgeLogger.info('Idempotency cache hit', { requestId, idempotencyKey, cached: true, action: 'process_rejection' }); return new Response( JSON.stringify(existingKey.result_data), { status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json', 'X-Cache-Status': 'HIT' } } ); } // STEP 4: Fetch submission to get submitter_id const { data: submission, error: submissionError } = await supabase .from('content_submissions') .select('user_id, status, assigned_to') .eq('id', submissionId) .single(); if (submissionError || !submission) { edgeLogger.error('Submission not found', { requestId, submissionId, error: submissionError?.message, action: 'process_rejection' }); return new Response( JSON.stringify({ error: 'Submission not found' }), { status: 404, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } // STEP 5: Verify moderator can reject this submission if (submission.assigned_to && submission.assigned_to !== user.id) { edgeLogger.warn('Lock conflict', { requestId, submissionId, lockedBy: submission.assigned_to, attemptedBy: user.id, action: 'process_rejection' }); return new Response( JSON.stringify({ error: 'Submission is locked by another moderator' }), { status: 409, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } if (!['pending', 'partially_approved'].includes(submission.status)) { edgeLogger.warn('Invalid submission status', { requestId, submissionId, currentStatus: submission.status, expectedStatuses: ['pending', 'partially_approved'], action: 'process_rejection' }); return new Response( JSON.stringify({ error: 'Submission already processed' }), { status: 400, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } // STEP 6: Register idempotency key as processing if (!existingKey) { await supabase.from('submission_idempotency_keys').insert({ idempotency_key: idempotencyKey, submission_id: submissionId, moderator_id: user.id, status: 'processing' }); } edgeLogger.info('Calling rejection transaction RPC', { requestId, submissionId, itemCount: itemIds.length, action: 'process_rejection' }); // ============================================================================ // STEP 7: Call RPC function with deadlock retry logic // ============================================================================ let retryCount = 0; const MAX_DEADLOCK_RETRIES = 3; let result: any = null; let rpcError: any = null; while (retryCount <= MAX_DEADLOCK_RETRIES) { const { data, error } = await supabase.rpc( 'process_rejection_transaction', { p_submission_id: submissionId, p_item_ids: itemIds, p_moderator_id: user.id, p_rejection_reason: rejectionReason, p_request_id: requestId } ); result = data; rpcError = error; if (!rpcError) { // Success! break; } // Check for deadlock (40P01) or serialization failure (40001) if (rpcError.code === '40P01' || rpcError.code === '40001') { retryCount++; if (retryCount > MAX_DEADLOCK_RETRIES) { edgeLogger.error('Max deadlock retries exceeded', { requestId, submissionId, attempt: retryCount, action: 'process_rejection' }); break; } const backoffMs = 100 * Math.pow(2, retryCount); edgeLogger.warn('Deadlock detected, retrying', { requestId, attempt: retryCount, maxAttempts: MAX_DEADLOCK_RETRIES, backoffMs, action: 'process_rejection' }); await new Promise(r => setTimeout(r, backoffMs)); continue; } // Non-retryable error, break immediately break; } if (rpcError) { // Transaction failed - EVERYTHING rolled back automatically by PostgreSQL const duration = endRequest(tracking); edgeLogger.error('Transaction failed', { requestId, duration, submissionId, error: rpcError.message, errorCode: rpcError.code, retries: retryCount, action: 'process_rejection' }); // Update idempotency key to failed try { await supabase .from('submission_idempotency_keys') .update({ status: 'failed', error_message: rpcError.message, completed_at: new Date().toISOString() }) .eq('idempotency_key', idempotencyKey); } catch (updateError) { edgeLogger.warn('Failed to update idempotency key', { requestId, idempotencyKey, status: 'failed', error: updateError instanceof Error ? updateError.message : String(updateError), action: 'process_rejection' }); // Non-blocking - continue with error response even if idempotency update fails } return new Response( JSON.stringify({ error: 'Rejection transaction failed', message: rpcError.message, details: rpcError.details, retries: retryCount }), { status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } const duration = endRequest(tracking); edgeLogger.info('Transaction completed successfully', { requestId, duration, submissionId, itemCount: itemIds.length, retries: retryCount, newStatus: result?.status, action: 'process_rejection' }); // STEP 8: Success - update idempotency key try { await supabase .from('submission_idempotency_keys') .update({ status: 'completed', result_data: result, completed_at: new Date().toISOString() }) .eq('idempotency_key', idempotencyKey); } catch (updateError) { edgeLogger.warn('Failed to update idempotency key', { requestId, idempotencyKey, status: 'completed', error: updateError instanceof Error ? updateError.message : String(updateError), action: 'process_rejection' }); // Non-blocking - transaction succeeded, so continue with success response } return new Response( JSON.stringify(result), { status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json', 'X-Request-Id': requestId } } ); } catch (error) { const duration = endRequest(tracking); edgeLogger.error('Unexpected error', { requestId, duration, error: error instanceof Error ? error.message : String(error), stack: error instanceof Error ? error.stack : undefined, action: 'process_rejection' }); return new Response( JSON.stringify({ error: 'Internal server error', message: error instanceof Error ? error.message : 'Unknown error' }), { status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } }; // Apply rate limiting: 10 requests per minute per IP (standard tier) serve(withRateLimit(handler, rateLimiters.standard, corsHeaders));