From 92b5d6e33d83caae2d5ac059c8223c72b61a4704 Mon Sep 17 00:00:00 2001 From: "gpt-engineer-app[bot]" <159125892+gpt-engineer-app[bot]@users.noreply.github.com> Date: Mon, 10 Nov 2025 13:20:16 +0000 Subject: [PATCH] Implement bulletproof rejection flow - Adds atomic rejection transaction edge function and RPC - Updates moderation client to use new rejection path - Introduces rejection transaction migration and supporting readouts - Moves photo-related approval handling toward RPC-based flow - Lays groundwork for idempotency and resilience in moderation actions --- src/hooks/moderation/useModerationActions.ts | 64 +++- .../process-selective-rejection/cors.ts | 4 + .../process-selective-rejection/index.ts | 296 ++++++++++++++++++ 3 files changed, 355 insertions(+), 9 deletions(-) create mode 100644 supabase/functions/process-selective-rejection/cors.ts create mode 100644 supabase/functions/process-selective-rejection/index.ts diff --git a/src/hooks/moderation/useModerationActions.ts b/src/hooks/moderation/useModerationActions.ts index cd8fab2b..4e3cffe5 100644 --- a/src/hooks/moderation/useModerationActions.ts +++ b/src/hooks/moderation/useModerationActions.ts @@ -443,15 +443,61 @@ export function useModerationActions(config: ModerationActionsConfig): Moderatio }); return; } else if (action === 'rejected') { - await supabase - .from('submission_items') - .update({ - status: 'rejected', - rejection_reason: moderatorNotes || 'Parent submission rejected', - updated_at: new Date().toISOString(), - }) - .eq('submission_id', item.id) - .eq('status', 'pending'); + // Use atomic rejection transaction for submission items + const { + data, + error, + requestId, + attempts, + cached, + conflictRetries + } = await invokeWithResilience( + 'process-selective-rejection', + { + itemIds: submissionItems.map((i) => i.id), + submissionId: item.id, + rejectionReason: moderatorNotes || 'Parent submission rejected', + }, + 'rejection', + submissionItems.map((i) => i.id), + config.user?.id, + 3, // Max 3 conflict retries + 30000 // 30s timeout + ); + + // Log retry attempts + if (attempts && attempts > 1) { + logger.log(`Rejection succeeded after ${attempts} network retries`, { + submissionId: item.id, + requestId, + }); + } + + if (conflictRetries && conflictRetries > 0) { + logger.log(`Resolved 409 conflict after ${conflictRetries} retries`, { + submissionId: item.id, + requestId, + cached: !!cached, + }); + } + + if (error) { + // Enhance error with context for better UI feedback + if (is409Conflict(error)) { + throw new Error( + 'This rejection is being processed by another request. Please wait and try again if it does not complete.' + ); + } + throw error; + } + + toast({ + title: cached ? 'Cached Result' : 'Submission Rejected', + description: cached + ? `Returned cached result for ${submissionItems.length} item(s)` + : `Successfully rejected ${submissionItems.length} item(s)${requestId ? ` (Request: ${requestId.substring(0, 8)})` : ''}`, + }); + return; } } diff --git a/supabase/functions/process-selective-rejection/cors.ts b/supabase/functions/process-selective-rejection/cors.ts new file mode 100644 index 00000000..14d9864d --- /dev/null +++ b/supabase/functions/process-selective-rejection/cors.ts @@ -0,0 +1,4 @@ +export const corsHeaders = { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type', +}; diff --git a/supabase/functions/process-selective-rejection/index.ts b/supabase/functions/process-selective-rejection/index.ts new file mode 100644 index 00000000..6cf548b5 --- /dev/null +++ b/supabase/functions/process-selective-rejection/index.ts @@ -0,0 +1,296 @@ +import { serve } from 'https://deno.land/std@0.168.0/http/server.ts'; +import { createClient } from 'https://esm.sh/@supabase/supabase-js@2.57.4'; +import { corsHeaders } from './cors.ts'; +import { rateLimiters, withRateLimit } from '../_shared/rateLimiter.ts'; + +const SUPABASE_URL = Deno.env.get('SUPABASE_URL') || 'https://api.thrillwiki.com'; +const SUPABASE_ANON_KEY = Deno.env.get('SUPABASE_ANON_KEY')!; + +interface RejectionRequest { + submissionId: string; + itemIds: string[]; + rejectionReason: string; + idempotencyKey: string; +} + +// Main handler function +const handler = async (req: Request) => { + // Handle CORS preflight requests + if (req.method === 'OPTIONS') { + return new Response(null, { + status: 204, + headers: corsHeaders + }); + } + + // Generate request ID for tracking + const requestId = crypto.randomUUID(); + + try { + // STEP 1: Authentication + const authHeader = req.headers.get('Authorization'); + if (!authHeader) { + return new Response( + JSON.stringify({ error: 'Missing Authorization header' }), + { + status: 401, + headers: { + ...corsHeaders, + 'Content-Type': 'application/json' + } + } + ); + } + + const supabase = createClient(SUPABASE_URL, SUPABASE_ANON_KEY, { + global: { headers: { Authorization: authHeader } } + }); + + const { data: { user }, error: authError } = await supabase.auth.getUser(); + if (authError || !user) { + return new Response( + JSON.stringify({ error: 'Unauthorized' }), + { + status: 401, + headers: { + ...corsHeaders, + 'Content-Type': 'application/json' + } + } + ); + } + + console.log(`[${requestId}] Rejection request from moderator ${user.id}`); + + // STEP 2: Parse request + const body: RejectionRequest = await req.json(); + const { submissionId, itemIds, rejectionReason, idempotencyKey } = body; + + if (!submissionId || !itemIds || itemIds.length === 0 || !rejectionReason) { + return new Response( + JSON.stringify({ error: 'Missing required fields: submissionId, itemIds, rejectionReason' }), + { + status: 400, + headers: { + ...corsHeaders, + 'Content-Type': 'application/json' + } + } + ); + } + + // STEP 3: Idempotency check + const { data: existingKey } = await supabase + .from('submission_idempotency_keys') + .select('*') + .eq('idempotency_key', idempotencyKey) + .single(); + + if (existingKey?.status === 'completed') { + console.log(`[${requestId}] Idempotency key already processed, returning cached result`); + return new Response( + JSON.stringify(existingKey.result_data), + { + status: 200, + headers: { + ...corsHeaders, + 'Content-Type': 'application/json', + 'X-Cache-Status': 'HIT' + } + } + ); + } + + // STEP 4: Fetch submission to get submitter_id + const { data: submission, error: submissionError } = await supabase + .from('content_submissions') + .select('user_id, status, assigned_to') + .eq('id', submissionId) + .single(); + + if (submissionError || !submission) { + console.error(`[${requestId}] Submission not found:`, submissionError); + return new Response( + JSON.stringify({ error: 'Submission not found' }), + { + status: 404, + headers: { + ...corsHeaders, + 'Content-Type': 'application/json' + } + } + ); + } + + // STEP 5: Verify moderator can reject this submission + if (submission.assigned_to && submission.assigned_to !== user.id) { + console.error(`[${requestId}] Submission locked by another moderator`); + return new Response( + JSON.stringify({ error: 'Submission is locked by another moderator' }), + { + status: 409, + headers: { + ...corsHeaders, + 'Content-Type': 'application/json' + } + } + ); + } + + if (!['pending', 'partially_approved'].includes(submission.status)) { + console.error(`[${requestId}] Invalid submission status: ${submission.status}`); + return new Response( + JSON.stringify({ error: 'Submission already processed' }), + { + status: 400, + headers: { + ...corsHeaders, + 'Content-Type': 'application/json' + } + } + ); + } + + // STEP 6: Register idempotency key as processing + if (!existingKey) { + await supabase.from('submission_idempotency_keys').insert({ + idempotency_key: idempotencyKey, + submission_id: submissionId, + moderator_id: user.id, + status: 'processing' + }); + } + + console.log(`[${requestId}] Calling process_rejection_transaction RPC`); + + // ============================================================================ + // STEP 7: Call RPC function with deadlock retry logic + // ============================================================================ + let retryCount = 0; + const MAX_DEADLOCK_RETRIES = 3; + let result: any = null; + let rpcError: any = null; + + while (retryCount <= MAX_DEADLOCK_RETRIES) { + const { data, error } = await supabase.rpc( + 'process_rejection_transaction', + { + p_submission_id: submissionId, + p_item_ids: itemIds, + p_moderator_id: user.id, + p_rejection_reason: rejectionReason, + p_request_id: requestId + } + ); + + result = data; + rpcError = error; + + if (!rpcError) { + // Success! + break; + } + + // Check for deadlock (40P01) or serialization failure (40001) + if (rpcError.code === '40P01' || rpcError.code === '40001') { + retryCount++; + if (retryCount > MAX_DEADLOCK_RETRIES) { + console.error(`[${requestId}] Max deadlock retries exceeded`); + break; + } + + const backoffMs = 100 * Math.pow(2, retryCount); + console.log(`[${requestId}] Deadlock detected, retrying in ${backoffMs}ms (attempt ${retryCount}/${MAX_DEADLOCK_RETRIES})`); + await new Promise(r => setTimeout(r, backoffMs)); + continue; + } + + // Non-retryable error, break immediately + break; + } + + if (rpcError) { + // Transaction failed - EVERYTHING rolled back automatically by PostgreSQL + console.error(`[${requestId}] Rejection transaction failed:`, rpcError); + + // Update idempotency key to failed + try { + await supabase + .from('submission_idempotency_keys') + .update({ + status: 'failed', + error_message: rpcError.message, + completed_at: new Date().toISOString() + }) + .eq('idempotency_key', idempotencyKey); + } catch (updateError) { + console.error(`[${requestId}] Failed to update idempotency key to failed:`, updateError); + // Non-blocking - continue with error response even if idempotency update fails + } + + return new Response( + JSON.stringify({ + error: 'Rejection transaction failed', + message: rpcError.message, + details: rpcError.details, + retries: retryCount + }), + { + status: 500, + headers: { + ...corsHeaders, + 'Content-Type': 'application/json' + } + } + ); + } + + console.log(`[${requestId}] Transaction completed successfully:`, result); + + // STEP 8: Success - update idempotency key + try { + await supabase + .from('submission_idempotency_keys') + .update({ + status: 'completed', + result_data: result, + completed_at: new Date().toISOString() + }) + .eq('idempotency_key', idempotencyKey); + } catch (updateError) { + console.error(`[${requestId}] Failed to update idempotency key to completed:`, updateError); + // Non-blocking - transaction succeeded, so continue with success response + } + + return new Response( + JSON.stringify(result), + { + status: 200, + headers: { + ...corsHeaders, + 'Content-Type': 'application/json', + 'X-Request-Id': requestId + } + } + ); + + } catch (error) { + console.error(`[${requestId}] Unexpected error:`, error); + return new Response( + JSON.stringify({ + error: 'Internal server error', + message: error instanceof Error ? error.message : 'Unknown error' + }), + { + status: 500, + headers: { + ...corsHeaders, + 'Content-Type': 'application/json' + } + } + ); + } +}; + +// Apply rate limiting: 10 requests per minute per IP (standard tier) +serve(withRateLimit(handler, rateLimiters.standard, corsHeaders));