Compare commits

...

2 Commits

Author SHA1 Message Date
gpt-engineer-app[bot]
6c03a5b0e7 Implement rejection bulletproofing
Created atomic rejection edge function process-selective-rejection and RPC, updated moderation client to use it, and ensured resilience; added CORS wrapper. Reminder: generate package-lock.json by running npm install.
2025-11-10 13:26:13 +00:00
gpt-engineer-app[bot]
92b5d6e33d Implement bulletproof rejection flow
- Adds atomic rejection transaction edge function and RPC
- Updates moderation client to use new rejection path
- Introduces rejection transaction migration and supporting readouts
- Moves photo-related approval handling toward RPC-based flow
- Lays groundwork for idempotency and resilience in moderation actions
2025-11-10 13:20:16 +00:00
5 changed files with 524 additions and 9 deletions

View File

@@ -443,15 +443,61 @@ export function useModerationActions(config: ModerationActionsConfig): Moderatio
});
return;
} else if (action === 'rejected') {
await supabase
.from('submission_items')
.update({
status: 'rejected',
rejection_reason: moderatorNotes || 'Parent submission rejected',
updated_at: new Date().toISOString(),
})
.eq('submission_id', item.id)
.eq('status', 'pending');
// Use atomic rejection transaction for submission items
const {
data,
error,
requestId,
attempts,
cached,
conflictRetries
} = await invokeWithResilience(
'process-selective-rejection',
{
itemIds: submissionItems.map((i) => i.id),
submissionId: item.id,
rejectionReason: moderatorNotes || 'Parent submission rejected',
},
'rejection',
submissionItems.map((i) => i.id),
config.user?.id,
3, // Max 3 conflict retries
30000 // 30s timeout
);
// Log retry attempts
if (attempts && attempts > 1) {
logger.log(`Rejection succeeded after ${attempts} network retries`, {
submissionId: item.id,
requestId,
});
}
if (conflictRetries && conflictRetries > 0) {
logger.log(`Resolved 409 conflict after ${conflictRetries} retries`, {
submissionId: item.id,
requestId,
cached: !!cached,
});
}
if (error) {
// Enhance error with context for better UI feedback
if (is409Conflict(error)) {
throw new Error(
'This rejection is being processed by another request. Please wait and try again if it does not complete.'
);
}
throw error;
}
toast({
title: cached ? 'Cached Result' : 'Submission Rejected',
description: cached
? `Returned cached result for ${submissionItems.length} item(s)`
: `Successfully rejected ${submissionItems.length} item(s)${requestId ? ` (Request: ${requestId.substring(0, 8)})` : ''}`,
});
return;
}
}

View File

@@ -6355,6 +6355,16 @@ export type Database = {
}
Returns: Json
}
process_rejection_transaction: {
Args: {
p_item_ids: string[]
p_moderator_id: string
p_rejection_reason: string
p_request_id?: string
p_submission_id: string
}
Returns: Json
}
release_expired_locks: { Args: never; Returns: number }
release_submission_lock: {
Args: { moderator_id: string; submission_id: string }

View File

@@ -0,0 +1,4 @@
export const corsHeaders = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type',
};

View File

@@ -0,0 +1,296 @@
import { serve } from 'https://deno.land/std@0.168.0/http/server.ts';
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2.57.4';
import { corsHeaders } from './cors.ts';
import { rateLimiters, withRateLimit } from '../_shared/rateLimiter.ts';
const SUPABASE_URL = Deno.env.get('SUPABASE_URL') || 'https://api.thrillwiki.com';
const SUPABASE_ANON_KEY = Deno.env.get('SUPABASE_ANON_KEY')!;
interface RejectionRequest {
submissionId: string;
itemIds: string[];
rejectionReason: string;
idempotencyKey: string;
}
// Main handler function
const handler = async (req: Request) => {
// Handle CORS preflight requests
if (req.method === 'OPTIONS') {
return new Response(null, {
status: 204,
headers: corsHeaders
});
}
// Generate request ID for tracking
const requestId = crypto.randomUUID();
try {
// STEP 1: Authentication
const authHeader = req.headers.get('Authorization');
if (!authHeader) {
return new Response(
JSON.stringify({ error: 'Missing Authorization header' }),
{
status: 401,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
const supabase = createClient(SUPABASE_URL, SUPABASE_ANON_KEY, {
global: { headers: { Authorization: authHeader } }
});
const { data: { user }, error: authError } = await supabase.auth.getUser();
if (authError || !user) {
return new Response(
JSON.stringify({ error: 'Unauthorized' }),
{
status: 401,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
console.log(`[${requestId}] Rejection request from moderator ${user.id}`);
// STEP 2: Parse request
const body: RejectionRequest = await req.json();
const { submissionId, itemIds, rejectionReason, idempotencyKey } = body;
if (!submissionId || !itemIds || itemIds.length === 0 || !rejectionReason) {
return new Response(
JSON.stringify({ error: 'Missing required fields: submissionId, itemIds, rejectionReason' }),
{
status: 400,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
// STEP 3: Idempotency check
const { data: existingKey } = await supabase
.from('submission_idempotency_keys')
.select('*')
.eq('idempotency_key', idempotencyKey)
.single();
if (existingKey?.status === 'completed') {
console.log(`[${requestId}] Idempotency key already processed, returning cached result`);
return new Response(
JSON.stringify(existingKey.result_data),
{
status: 200,
headers: {
...corsHeaders,
'Content-Type': 'application/json',
'X-Cache-Status': 'HIT'
}
}
);
}
// STEP 4: Fetch submission to get submitter_id
const { data: submission, error: submissionError } = await supabase
.from('content_submissions')
.select('user_id, status, assigned_to')
.eq('id', submissionId)
.single();
if (submissionError || !submission) {
console.error(`[${requestId}] Submission not found:`, submissionError);
return new Response(
JSON.stringify({ error: 'Submission not found' }),
{
status: 404,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
// STEP 5: Verify moderator can reject this submission
if (submission.assigned_to && submission.assigned_to !== user.id) {
console.error(`[${requestId}] Submission locked by another moderator`);
return new Response(
JSON.stringify({ error: 'Submission is locked by another moderator' }),
{
status: 409,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
if (!['pending', 'partially_approved'].includes(submission.status)) {
console.error(`[${requestId}] Invalid submission status: ${submission.status}`);
return new Response(
JSON.stringify({ error: 'Submission already processed' }),
{
status: 400,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
// STEP 6: Register idempotency key as processing
if (!existingKey) {
await supabase.from('submission_idempotency_keys').insert({
idempotency_key: idempotencyKey,
submission_id: submissionId,
moderator_id: user.id,
status: 'processing'
});
}
console.log(`[${requestId}] Calling process_rejection_transaction RPC`);
// ============================================================================
// STEP 7: Call RPC function with deadlock retry logic
// ============================================================================
let retryCount = 0;
const MAX_DEADLOCK_RETRIES = 3;
let result: any = null;
let rpcError: any = null;
while (retryCount <= MAX_DEADLOCK_RETRIES) {
const { data, error } = await supabase.rpc(
'process_rejection_transaction',
{
p_submission_id: submissionId,
p_item_ids: itemIds,
p_moderator_id: user.id,
p_rejection_reason: rejectionReason,
p_request_id: requestId
}
);
result = data;
rpcError = error;
if (!rpcError) {
// Success!
break;
}
// Check for deadlock (40P01) or serialization failure (40001)
if (rpcError.code === '40P01' || rpcError.code === '40001') {
retryCount++;
if (retryCount > MAX_DEADLOCK_RETRIES) {
console.error(`[${requestId}] Max deadlock retries exceeded`);
break;
}
const backoffMs = 100 * Math.pow(2, retryCount);
console.log(`[${requestId}] Deadlock detected, retrying in ${backoffMs}ms (attempt ${retryCount}/${MAX_DEADLOCK_RETRIES})`);
await new Promise(r => setTimeout(r, backoffMs));
continue;
}
// Non-retryable error, break immediately
break;
}
if (rpcError) {
// Transaction failed - EVERYTHING rolled back automatically by PostgreSQL
console.error(`[${requestId}] Rejection transaction failed:`, rpcError);
// Update idempotency key to failed
try {
await supabase
.from('submission_idempotency_keys')
.update({
status: 'failed',
error_message: rpcError.message,
completed_at: new Date().toISOString()
})
.eq('idempotency_key', idempotencyKey);
} catch (updateError) {
console.error(`[${requestId}] Failed to update idempotency key to failed:`, updateError);
// Non-blocking - continue with error response even if idempotency update fails
}
return new Response(
JSON.stringify({
error: 'Rejection transaction failed',
message: rpcError.message,
details: rpcError.details,
retries: retryCount
}),
{
status: 500,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
console.log(`[${requestId}] Transaction completed successfully:`, result);
// STEP 8: Success - update idempotency key
try {
await supabase
.from('submission_idempotency_keys')
.update({
status: 'completed',
result_data: result,
completed_at: new Date().toISOString()
})
.eq('idempotency_key', idempotencyKey);
} catch (updateError) {
console.error(`[${requestId}] Failed to update idempotency key to completed:`, updateError);
// Non-blocking - transaction succeeded, so continue with success response
}
return new Response(
JSON.stringify(result),
{
status: 200,
headers: {
...corsHeaders,
'Content-Type': 'application/json',
'X-Request-Id': requestId
}
}
);
} catch (error) {
console.error(`[${requestId}] Unexpected error:`, error);
return new Response(
JSON.stringify({
error: 'Internal server error',
message: error instanceof Error ? error.message : 'Unknown error'
}),
{
status: 500,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
};
// Apply rate limiting: 10 requests per minute per IP (standard tier)
serve(withRateLimit(handler, rateLimiters.standard, corsHeaders));

View File

@@ -0,0 +1,159 @@
-- ============================================================================
-- CRITICAL: Add Atomic Rejection Transaction RPC
-- ============================================================================
-- This migration creates process_rejection_transaction to ensure atomic
-- rejection of submission items with proper audit logging and status updates.
--
-- Features:
-- - Atomic updates to submission_items.status = 'rejected'
-- - Sets rejection_reason for each item
-- - Updates parent submission status (rejected or partially_approved)
-- - Logs to moderation_audit_log
-- - Releases lock (assigned_to = NULL, locked_until = NULL)
-- - Returns transaction result
-- ============================================================================
CREATE OR REPLACE FUNCTION process_rejection_transaction(
p_submission_id UUID,
p_item_ids UUID[],
p_moderator_id UUID,
p_rejection_reason TEXT,
p_request_id TEXT DEFAULT NULL
)
RETURNS JSONB
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = public
AS $$
DECLARE
v_start_time TIMESTAMPTZ;
v_result JSONB;
v_rejected_count INTEGER := 0;
v_final_status TEXT;
v_some_pending BOOLEAN := FALSE;
BEGIN
v_start_time := clock_timestamp();
RAISE NOTICE '[%] Starting atomic rejection transaction for submission %',
COALESCE(p_request_id, 'NO_REQUEST_ID'),
p_submission_id;
-- ========================================================================
-- STEP 1: Set session variables (transaction-scoped)
-- ========================================================================
PERFORM set_config('app.moderator_id', p_moderator_id::text, true);
-- ========================================================================
-- STEP 2: Validate submission ownership and lock status
-- ========================================================================
IF NOT EXISTS (
SELECT 1 FROM content_submissions
WHERE id = p_submission_id
AND (assigned_to = p_moderator_id OR assigned_to IS NULL)
AND status IN ('pending', 'partially_approved')
) THEN
RAISE EXCEPTION 'Submission not found, locked by another moderator, or already processed'
USING ERRCODE = '42501';
END IF;
-- ========================================================================
-- STEP 3: Update all items to rejected atomically
-- ========================================================================
UPDATE submission_items
SET
status = 'rejected',
rejection_reason = p_rejection_reason,
updated_at = NOW()
WHERE id = ANY(p_item_ids)
AND submission_id = p_submission_id
AND status IN ('pending', 'rejected');
GET DIAGNOSTICS v_rejected_count = ROW_COUNT;
RAISE NOTICE '[%] Rejected % items',
COALESCE(p_request_id, 'NO_REQUEST_ID'),
v_rejected_count;
-- ========================================================================
-- STEP 4: Determine final submission status
-- ========================================================================
-- Check if any items are still pending
SELECT EXISTS(
SELECT 1 FROM submission_items
WHERE submission_id = p_submission_id
AND status = 'pending'
) INTO v_some_pending;
-- Set final status
v_final_status := CASE
WHEN v_some_pending THEN 'partially_approved'
WHEN EXISTS(
SELECT 1 FROM submission_items
WHERE submission_id = p_submission_id
AND status = 'approved'
) THEN 'partially_approved'
ELSE 'rejected'
END;
-- ========================================================================
-- STEP 5: Update parent submission
-- ========================================================================
UPDATE content_submissions
SET
status = v_final_status,
reviewer_id = p_moderator_id,
reviewed_at = NOW(),
assigned_to = NULL,
locked_until = NULL,
reviewer_notes = p_rejection_reason
WHERE id = p_submission_id;
-- ========================================================================
-- STEP 6: Log to moderation_audit_log
-- ========================================================================
INSERT INTO moderation_audit_log (
submission_id,
moderator_id,
action,
details,
created_at
) VALUES (
p_submission_id,
p_moderator_id,
'rejection',
jsonb_build_object(
'item_ids', p_item_ids,
'rejection_reason', p_rejection_reason,
'rejected_count', v_rejected_count,
'final_status', v_final_status,
'request_id', p_request_id
),
NOW()
);
-- ========================================================================
-- STEP 7: Build result
-- ========================================================================
v_result := jsonb_build_object(
'success', TRUE,
'rejected_count', v_rejected_count,
'submission_status', v_final_status,
'duration_ms', EXTRACT(EPOCH FROM (clock_timestamp() - v_start_time)) * 1000
);
-- Clear session variables
PERFORM set_config('app.moderator_id', '', true);
RAISE NOTICE '[%] Rejection transaction completed in %ms',
COALESCE(p_request_id, 'NO_REQUEST_ID'),
EXTRACT(EPOCH FROM (clock_timestamp() - v_start_time)) * 1000;
RETURN v_result;
END;
$$;
-- Grant execute permissions
GRANT EXECUTE ON FUNCTION process_rejection_transaction TO authenticated;
COMMENT ON FUNCTION process_rejection_transaction IS
'Atomic rejection transaction with audit logging and lock release';