Migrate complex functions in batches

Batch 1 of Phase 2: migrate 3-4 edge functions to use createEdgeFunction wrapper (process-selective-approval, process-selective-rejection, rate-limit-metrics) to enable automatic error logging, CORS, auth, and reduced boilerplate; preserve existing logic where applicable and prepare for subsequent batches.
This commit is contained in:
gpt-engineer-app[bot]
2025-11-11 19:17:21 +00:00
parent fa57d497af
commit afe7a93f69
3 changed files with 475 additions and 1212 deletions

View File

@@ -1,154 +1,27 @@
import { serve } from 'https://deno.land/std@0.168.0/http/server.ts';
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2.57.4';
import { corsHeadersWithTracing as corsHeaders } from '../_shared/cors.ts';
import { rateLimiters, withRateLimit } from '../_shared/rateLimiter.ts';
import { createEdgeFunction } from '../_shared/edgeFunctionWrapper.ts';
import {
edgeLogger,
startSpan,
endSpan,
addSpanEvent,
setSpanAttributes,
getSpanContext,
logSpan,
extractSpanContextFromHeaders,
type Span
startSpan,
endSpan,
} from '../_shared/logger.ts';
import { formatEdgeError, toError } from '../_shared/errorFormatter.ts';
import { toError } from '../_shared/errorFormatter.ts';
import {
validateApprovalRequest,
validateSubmissionItems,
getSubmissionTableName,
getMainTableName,
type ValidatedSubmissionItem
} from '../_shared/submissionValidation.ts';
import { ValidationError } from '../_shared/typeValidation.ts';
const SUPABASE_URL = Deno.env.get('SUPABASE_URL') || 'https://api.thrillwiki.com';
const SUPABASE_ANON_KEY = Deno.env.get('SUPABASE_ANON_KEY');
// ============================================================================
// CRITICAL: Validate environment variables at startup
// ============================================================================
if (!SUPABASE_ANON_KEY) {
const errorMsg = 'CRITICAL: SUPABASE_ANON_KEY environment variable is not set!';
console.error(errorMsg, {
timestamp: new Date().toISOString(),
hasUrl: !!SUPABASE_URL,
url: SUPABASE_URL,
availableEnvVars: Object.keys(Deno.env.toObject()).filter(k =>
k.includes('SUPABASE') || k.includes('URL')
)
});
throw new Error('Missing required environment variable: SUPABASE_ANON_KEY');
}
console.log('Edge function initialized successfully', {
timestamp: new Date().toISOString(),
function: 'process-selective-approval',
hasUrl: !!SUPABASE_URL,
hasKey: !!SUPABASE_ANON_KEY,
keyLength: SUPABASE_ANON_KEY.length
});
interface ApprovalRequest {
submissionId: string;
itemIds: string[];
}
// Main handler function
const handler = async (req: Request) => {
// ============================================================================
// Log every incoming request immediately
// ============================================================================
console.log('Request received', {
timestamp: new Date().toISOString(),
method: req.method,
url: req.url,
headers: {
authorization: req.headers.has('Authorization') ? '[PRESENT]' : '[MISSING]',
contentType: req.headers.get('Content-Type'),
traceparent: req.headers.get('traceparent') || '[NONE]'
}
const handler = async (req: Request, context: { supabase: any; user: any; span: any; requestId: string }) => {
const { supabase, user, span: rootSpan, requestId } = context;
setSpanAttributes(rootSpan, {
'user.id': user.id,
'function.name': 'process-selective-approval'
});
// Handle CORS preflight requests
if (req.method === 'OPTIONS') {
return new Response(null, {
status: 204,
headers: corsHeaders
});
}
// Extract parent span context from headers (if present)
const parentSpanContext = extractSpanContextFromHeaders(req.headers);
// Create root span for this edge function invocation
const rootSpan = startSpan(
'process-selective-approval',
'SERVER',
parentSpanContext,
{
'http.method': 'POST',
'function.name': 'process-selective-approval',
}
);
const requestId = rootSpan.spanId;
try {
// STEP 1: Authentication
addSpanEvent(rootSpan, 'authentication_start');
const authHeader = req.headers.get('Authorization');
if (!authHeader) {
addSpanEvent(rootSpan, 'authentication_failed', { reason: 'missing_header' });
endSpan(rootSpan, 'error');
logSpan(rootSpan);
return new Response(
JSON.stringify({ error: 'Missing Authorization header' }),
{
status: 401,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
const supabase = createClient(SUPABASE_URL, SUPABASE_ANON_KEY, {
global: { headers: { Authorization: authHeader } }
});
const { data: { user }, error: authError } = await supabase.auth.getUser();
if (authError || !user) {
addSpanEvent(rootSpan, 'authentication_failed', { error: authError?.message });
edgeLogger.warn('Authentication failed', {
requestId,
error: authError?.message,
action: 'process_approval'
});
endSpan(rootSpan, 'error', authError || new Error('Unauthorized'));
logSpan(rootSpan);
return new Response(
JSON.stringify({ error: 'Unauthorized' }),
{
status: 401,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
setSpanAttributes(rootSpan, { 'user.id': user.id });
addSpanEvent(rootSpan, 'authentication_success');
edgeLogger.info('Approval request received', {
requestId,
moderatorId: user.id,
action: 'process_approval'
});
// STEP 2: Parse and validate request
// STEP 1: Parse and validate request
addSpanEvent(rootSpan, 'validation_start');
let submissionId: string;
@@ -166,44 +39,15 @@ const handler = async (req: Request) => {
expected: error.expected,
received: error.received,
});
edgeLogger.warn('Request validation failed', {
requestId,
field: error.field,
expected: error.expected,
received: error.received,
action: 'process_approval'
});
endSpan(rootSpan, 'error', error);
logSpan(rootSpan);
return new Response(
JSON.stringify({
error: error.message,
field: error.field,
requestId
}),
{
status: 400,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
}
);
throw error; // Will be caught by wrapper
}
throw error;
}
const idempotencyKey = req.headers.get('x-idempotency-key');
if (!idempotencyKey) {
addSpanEvent(rootSpan, 'validation_failed', { reason: 'missing_idempotency_key' });
edgeLogger.warn('Missing idempotency key', { requestId });
endSpan(rootSpan, 'error');
logSpan(rootSpan);
return new Response(
JSON.stringify({ error: 'Missing X-Idempotency-Key header' }),
{
status: 400,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
}
);
throw new ValidationError('idempotency_key', 'Missing X-Idempotency-Key header', 'string', 'undefined');
}
setSpanAttributes(rootSpan, {
@@ -212,14 +56,8 @@ const handler = async (req: Request) => {
'idempotency.key': idempotencyKey,
});
addSpanEvent(rootSpan, 'validation_complete');
edgeLogger.info('Request validated', {
requestId,
submissionId,
itemCount: itemIds.length,
action: 'process_approval'
});
// STEP 3: Idempotency check
// STEP 2: Idempotency check
addSpanEvent(rootSpan, 'idempotency_check_start');
const { data: existingKey } = await supabase
.from('submission_idempotency_keys')
@@ -230,28 +68,16 @@ const handler = async (req: Request) => {
if (existingKey?.status === 'completed') {
addSpanEvent(rootSpan, 'idempotency_cache_hit');
setSpanAttributes(rootSpan, { 'cache.hit': true });
edgeLogger.info('Idempotency cache hit', {
requestId,
idempotencyKey,
cached: true,
action: 'process_approval'
});
endSpan(rootSpan, 'ok');
logSpan(rootSpan);
return new Response(
JSON.stringify(existingKey.result_data),
{
status: 200,
headers: {
...corsHeaders,
'Content-Type': 'application/json',
'X-Cache-Status': 'HIT'
}
headers: { 'X-Cache-Status': 'HIT' }
}
);
}
// STEP 4: Fetch submission to get submitter_id
// STEP 3: Fetch submission to get submitter_id
const { data: submission, error: submissionError } = await supabase
.from('content_submissions')
.select('user_id, status, assigned_to')
@@ -260,69 +86,19 @@ const handler = async (req: Request) => {
if (submissionError || !submission) {
addSpanEvent(rootSpan, 'submission_fetch_failed', { error: submissionError?.message });
edgeLogger.error('Submission not found', {
requestId,
submissionId,
error: submissionError?.message,
action: 'process_approval'
});
endSpan(rootSpan, 'error', submissionError || new Error('Submission not found'));
logSpan(rootSpan);
return new Response(
JSON.stringify({ error: 'Submission not found' }),
{
status: 404,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
throw new Error('Submission not found');
}
// STEP 5: Verify moderator can approve this submission
// STEP 4: Verify moderator can approve this submission
if (submission.assigned_to && submission.assigned_to !== user.id) {
edgeLogger.warn('Lock conflict', {
requestId,
submissionId,
lockedBy: submission.assigned_to,
attemptedBy: user.id,
action: 'process_approval'
});
return new Response(
JSON.stringify({ error: 'Submission is locked by another moderator' }),
{
status: 409,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
throw new Error('Submission is locked by another moderator');
}
if (!['pending', 'partially_approved'].includes(submission.status)) {
edgeLogger.warn('Invalid submission status', {
requestId,
submissionId,
currentStatus: submission.status,
expectedStatuses: ['pending', 'partially_approved'],
action: 'process_approval'
});
return new Response(
JSON.stringify({ error: 'Submission already processed' }),
{
status: 400,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
throw new Error('Submission already processed');
}
// STEP 6: Register idempotency key as processing (atomic upsert)
// ✅ CRITICAL FIX: Use ON CONFLICT to prevent race conditions
// STEP 5: Register idempotency key as processing (atomic upsert)
if (!existingKey) {
const { data: insertedKey, error: idempotencyError } = await supabase
.from('submission_idempotency_keys')
@@ -336,17 +112,8 @@ const handler = async (req: Request) => {
.select()
.single();
// If conflict occurred, another moderator is processing
if (idempotencyError && idempotencyError.code === '23505') {
edgeLogger.warn('Idempotency key conflict - another request processing', {
requestId,
idempotencyKey,
moderatorId: user.id
});
return new Response(
JSON.stringify({ error: 'Another moderator is processing this submission' }),
{ status: 409, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
);
throw new Error('Another moderator is processing this submission');
}
if (idempotencyError) {
@@ -368,16 +135,8 @@ const handler = async (req: Request) => {
);
addSpanEvent(rpcSpan, 'rpc_call_start');
edgeLogger.info('Calling approval transaction RPC', {
requestId,
submissionId,
itemCount: itemIds.length,
action: 'process_approval'
});
// ============================================================================
// STEP 7: Call RPC function with deadlock retry logic
// ============================================================================
// STEP 6: Call RPC function with deadlock retry logic
let retryCount = 0;
const MAX_DEADLOCK_RETRIES = 3;
let result: any = null;
@@ -401,7 +160,6 @@ const handler = async (req: Request) => {
rpcError = error;
if (!rpcError) {
// Success!
addSpanEvent(rpcSpan, 'rpc_call_success', {
'result.status': data?.status,
'items.processed': itemIds.length,
@@ -414,69 +172,25 @@ const handler = async (req: Request) => {
retryCount++;
if (retryCount > MAX_DEADLOCK_RETRIES) {
addSpanEvent(rpcSpan, 'max_retries_exceeded', { attempt: retryCount });
edgeLogger.error('Max deadlock retries exceeded', {
requestId,
submissionId,
attempt: retryCount,
action: 'process_approval'
});
break;
}
const backoffMs = 100 * Math.pow(2, retryCount);
addSpanEvent(rpcSpan, 'deadlock_retry', { attempt: retryCount, backoffMs });
edgeLogger.warn('Deadlock detected, retrying', {
requestId,
attempt: retryCount,
maxAttempts: MAX_DEADLOCK_RETRIES,
backoffMs,
action: 'process_approval'
});
await new Promise(r => setTimeout(r, backoffMs));
continue;
}
// Non-retryable error, break immediately
// Non-retryable error
addSpanEvent(rpcSpan, 'rpc_call_failed', {
error: rpcError.message,
errorCode: rpcError.code
});
// Enhanced error logging for type mismatches
if (rpcError.code === 'P0001' && rpcError.message?.includes('Unknown item type')) {
// Extract the unknown type from error message
const typeMatch = rpcError.message.match(/Unknown item type: (\w+)/);
const unknownType = typeMatch ? typeMatch[1] : 'unknown';
edgeLogger.error('Entity type mismatch detected', {
requestId,
submissionId,
unknownType,
error: rpcError.message,
hint: `The submission contains an item with type '${unknownType}' which is not recognized by process_approval_transaction. ` +
`Valid types are: park, ride, manufacturer, operator, property_owner, designer, company, ride_model, photo. ` +
`This indicates a data model inconsistency between submission_items and the RPC function.`,
action: 'process_approval'
});
}
break;
}
if (rpcError) {
// Transaction failed - EVERYTHING rolled back automatically by PostgreSQL
endSpan(rpcSpan, 'error', rpcError);
logSpan(rpcSpan);
edgeLogger.error('Transaction failed', {
requestId,
duration: rpcSpan.duration,
submissionId,
error: rpcError.message,
errorCode: rpcError.code,
retries: retryCount,
action: 'process_approval'
});
// Update idempotency key to failed
try {
@@ -489,56 +203,21 @@ const handler = async (req: Request) => {
})
.eq('idempotency_key', idempotencyKey);
} catch (updateError) {
edgeLogger.warn('Failed to update idempotency key', {
requestId,
idempotencyKey,
status: 'failed',
error: formatEdgeError(updateError),
action: 'process_approval'
});
// Non-blocking - continue with error response even if idempotency update fails
// Non-blocking
}
endSpan(rootSpan, 'error', rpcError);
logSpan(rootSpan);
return new Response(
JSON.stringify({
error: 'Approval transaction failed',
message: rpcError.message,
details: rpcError.details,
retries: retryCount
}),
{
status: 500,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
throw toError(rpcError);
}
// RPC succeeded
endSpan(rpcSpan, 'ok');
logSpan(rpcSpan);
setSpanAttributes(rootSpan, {
'result.status': result?.status,
'result.final_status': result?.status,
'retries': retryCount,
});
edgeLogger.info('Transaction completed successfully', {
requestId,
duration: rpcSpan.duration,
submissionId,
itemCount: itemIds.length,
retries: retryCount,
newStatus: result?.status,
action: 'process_approval'
});
// STEP 8: Success - update idempotency key
// STEP 7: Success - update idempotency key
try {
await supabase
.from('submission_idempotency_keys')
@@ -549,66 +228,20 @@ const handler = async (req: Request) => {
})
.eq('idempotency_key', idempotencyKey);
} catch (updateError) {
edgeLogger.warn('Failed to update idempotency key', {
requestId,
idempotencyKey,
status: 'completed',
error: formatEdgeError(updateError),
action: 'process_approval'
});
// Non-blocking - transaction succeeded, so continue with success response
}
endSpan(rootSpan, 'ok');
logSpan(rootSpan);
return new Response(
JSON.stringify(result),
{
status: 200,
headers: {
...corsHeaders,
'Content-Type': 'application/json',
'X-Request-Id': requestId
}
}
);
} catch (error) {
// Enhanced error logging with full details
const errorDetails = {
timestamp: new Date().toISOString(),
requestId: rootSpan?.spanId || 'unknown',
duration: rootSpan?.duration || 0,
error: formatEdgeError(error),
errorType: error instanceof Error ? error.constructor.name : typeof error,
stack: error instanceof Error ? error.stack : undefined,
action: 'process_approval'
};
console.error('Uncaught error in handler', errorDetails);
endSpan(rootSpan, 'error', error instanceof Error ? error : toError(error));
logSpan(rootSpan);
edgeLogger.error('Unexpected error', errorDetails);
return new Response(
JSON.stringify({
error: 'Internal server error',
message: error instanceof Error ? error.message : 'Unknown error',
requestId: rootSpan?.spanId || 'unknown'
}),
{
status: 500,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
return result;
};
// Apply rate limiting: 10 requests per minute per IP (moderate tier for moderation actions)
serve(withRateLimit(handler, rateLimiters.moderate, corsHeaders));
// Create edge function with automatic error handling, CORS, auth, and logging
createEdgeFunction(
{
name: 'process-selective-approval',
requireAuth: true,
corsEnabled: true,
enableTracing: true,
rateLimitTier: 'moderate'
},
handler
);

View File

@@ -1,114 +1,27 @@
import { serve } from 'https://deno.land/std@0.168.0/http/server.ts';
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2.57.4';
import { corsHeadersWithTracing as corsHeaders } from '../_shared/cors.ts';
import { rateLimiters, withRateLimit } from '../_shared/rateLimiter.ts';
import { createEdgeFunction } from '../_shared/edgeFunctionWrapper.ts';
import {
edgeLogger,
startSpan,
endSpan,
addSpanEvent,
setSpanAttributes,
getSpanContext,
logSpan,
extractSpanContextFromHeaders,
type Span
startSpan,
endSpan,
} from '../_shared/logger.ts';
import { formatEdgeError, toError } from '../_shared/errorFormatter.ts';
import { toError } from '../_shared/errorFormatter.ts';
import {
validateRejectionRequest,
type ValidatedRejectionRequest
} from '../_shared/submissionValidation.ts';
import { ValidationError } from '../_shared/typeValidation.ts';
const SUPABASE_URL = Deno.env.get('SUPABASE_URL') || 'https://api.thrillwiki.com';
const SUPABASE_ANON_KEY = Deno.env.get('SUPABASE_ANON_KEY')!;
interface RejectionRequest {
submissionId: string;
itemIds: string[];
rejectionReason: string;
}
// Main handler function
const handler = async (req: Request) => {
// Handle CORS preflight requests
if (req.method === 'OPTIONS') {
return new Response(null, {
status: 204,
headers: corsHeaders
});
}
const handler = async (req: Request, context: { supabase: any; user: any; span: any; requestId: string }) => {
const { supabase, user, span: rootSpan, requestId } = context;
// Extract parent span context from headers (if present)
const parentSpanContext = extractSpanContextFromHeaders(req.headers);
// Create root span for this edge function invocation
const rootSpan = startSpan(
'process-selective-rejection',
'SERVER',
parentSpanContext,
{
'http.method': 'POST',
'function.name': 'process-selective-rejection',
}
);
const requestId = rootSpan.spanId;
try {
// STEP 1: Authentication
addSpanEvent(rootSpan, 'authentication_start');
const authHeader = req.headers.get('Authorization');
if (!authHeader) {
addSpanEvent(rootSpan, 'authentication_failed', { reason: 'missing_header' });
endSpan(rootSpan, 'error');
logSpan(rootSpan);
return new Response(
JSON.stringify({ error: 'Missing Authorization header' }),
{
status: 401,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
const supabase = createClient(SUPABASE_URL, SUPABASE_ANON_KEY, {
global: { headers: { Authorization: authHeader } }
setSpanAttributes(rootSpan, {
'user.id': user.id,
'function.name': 'process-selective-rejection'
});
const { data: { user }, error: authError } = await supabase.auth.getUser();
if (authError || !user) {
addSpanEvent(rootSpan, 'authentication_failed', { error: authError?.message });
edgeLogger.warn('Authentication failed', {
requestId,
error: authError?.message,
action: 'process_rejection'
});
endSpan(rootSpan, 'error', authError || new Error('Unauthorized'));
logSpan(rootSpan);
return new Response(
JSON.stringify({ error: 'Unauthorized' }),
{
status: 401,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
setSpanAttributes(rootSpan, { 'user.id': user.id });
addSpanEvent(rootSpan, 'authentication_success');
edgeLogger.info('Rejection request received', {
requestId,
moderatorId: user.id,
action: 'process_rejection'
});
// STEP 2: Parse and validate request
// STEP 1: Parse and validate request
addSpanEvent(rootSpan, 'validation_start');
let submissionId: string;
@@ -128,44 +41,15 @@ const handler = async (req: Request) => {
expected: error.expected,
received: error.received,
});
edgeLogger.warn('Request validation failed', {
requestId,
field: error.field,
expected: error.expected,
received: error.received,
action: 'process_rejection'
});
endSpan(rootSpan, 'error', error);
logSpan(rootSpan);
return new Response(
JSON.stringify({
error: error.message,
field: error.field,
requestId
}),
{
status: 400,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
}
);
throw error; // Will be caught by wrapper
}
throw error;
}
const idempotencyKey = req.headers.get('x-idempotency-key');
if (!idempotencyKey) {
addSpanEvent(rootSpan, 'validation_failed', { reason: 'missing_idempotency_key' });
edgeLogger.warn('Missing idempotency key', { requestId });
endSpan(rootSpan, 'error');
logSpan(rootSpan);
return new Response(
JSON.stringify({ error: 'Missing X-Idempotency-Key header' }),
{
status: 400,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
}
);
throw new ValidationError('idempotency_key', 'Missing X-Idempotency-Key header', 'string', 'undefined');
}
setSpanAttributes(rootSpan, {
@@ -174,14 +58,8 @@ const handler = async (req: Request) => {
'idempotency.key': idempotencyKey,
});
addSpanEvent(rootSpan, 'validation_complete');
edgeLogger.info('Request validated', {
requestId,
submissionId,
itemCount: itemIds.length,
action: 'process_rejection'
});
// STEP 3: Idempotency check
// STEP 2: Idempotency check
addSpanEvent(rootSpan, 'idempotency_check_start');
const { data: existingKey } = await supabase
.from('submission_idempotency_keys')
@@ -192,28 +70,16 @@ const handler = async (req: Request) => {
if (existingKey?.status === 'completed') {
addSpanEvent(rootSpan, 'idempotency_cache_hit');
setSpanAttributes(rootSpan, { 'cache.hit': true });
edgeLogger.info('Idempotency cache hit', {
requestId,
idempotencyKey,
cached: true,
action: 'process_rejection'
});
endSpan(rootSpan, 'ok');
logSpan(rootSpan);
return new Response(
JSON.stringify(existingKey.result_data),
{
status: 200,
headers: {
...corsHeaders,
'Content-Type': 'application/json',
'X-Cache-Status': 'HIT'
}
headers: { 'X-Cache-Status': 'HIT' }
}
);
}
// STEP 4: Fetch submission to get submitter_id
// STEP 3: Fetch submission to verify
const { data: submission, error: submissionError } = await supabase
.from('content_submissions')
.select('user_id, status, assigned_to')
@@ -222,69 +88,19 @@ const handler = async (req: Request) => {
if (submissionError || !submission) {
addSpanEvent(rootSpan, 'submission_fetch_failed', { error: submissionError?.message });
edgeLogger.error('Submission not found', {
requestId,
submissionId,
error: submissionError?.message,
action: 'process_rejection'
});
endSpan(rootSpan, 'error', submissionError || new Error('Submission not found'));
logSpan(rootSpan);
return new Response(
JSON.stringify({ error: 'Submission not found' }),
{
status: 404,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
throw new Error('Submission not found');
}
// STEP 5: Verify moderator can reject this submission
// STEP 4: Verify moderator can reject this submission
if (submission.assigned_to && submission.assigned_to !== user.id) {
edgeLogger.warn('Lock conflict', {
requestId,
submissionId,
lockedBy: submission.assigned_to,
attemptedBy: user.id,
action: 'process_rejection'
});
return new Response(
JSON.stringify({ error: 'Submission is locked by another moderator' }),
{
status: 409,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
throw new Error('Submission is locked by another moderator');
}
if (!['pending', 'partially_approved'].includes(submission.status)) {
edgeLogger.warn('Invalid submission status', {
requestId,
submissionId,
currentStatus: submission.status,
expectedStatuses: ['pending', 'partially_approved'],
action: 'process_rejection'
});
return new Response(
JSON.stringify({ error: 'Submission already processed' }),
{
status: 400,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
throw new Error('Submission already processed');
}
// STEP 6: Register idempotency key as processing (atomic upsert)
// ✅ CRITICAL FIX: Use ON CONFLICT to prevent race conditions
// STEP 5: Register idempotency key as processing (atomic upsert)
if (!existingKey) {
const { data: insertedKey, error: idempotencyError } = await supabase
.from('submission_idempotency_keys')
@@ -298,17 +114,8 @@ const handler = async (req: Request) => {
.select()
.single();
// If conflict occurred, another moderator is processing
if (idempotencyError && idempotencyError.code === '23505') {
edgeLogger.warn('Idempotency key conflict - another request processing', {
requestId,
idempotencyKey,
moderatorId: user.id
});
return new Response(
JSON.stringify({ error: 'Another moderator is processing this submission' }),
{ status: 409, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
);
throw new Error('Another moderator is processing this submission');
}
if (idempotencyError) {
@@ -330,16 +137,8 @@ const handler = async (req: Request) => {
);
addSpanEvent(rpcSpan, 'rpc_call_start');
edgeLogger.info('Calling rejection transaction RPC', {
requestId,
submissionId,
itemCount: itemIds.length,
action: 'process_rejection'
});
// ============================================================================
// STEP 7: Call RPC function with deadlock retry logic
// ============================================================================
// STEP 6: Call RPC function with deadlock retry logic
let retryCount = 0;
const MAX_DEADLOCK_RETRIES = 3;
let result: any = null;
@@ -363,7 +162,6 @@ const handler = async (req: Request) => {
rpcError = error;
if (!rpcError) {
// Success!
addSpanEvent(rpcSpan, 'rpc_call_success', {
'result.status': data?.status,
'items.processed': itemIds.length,
@@ -376,29 +174,16 @@ const handler = async (req: Request) => {
retryCount++;
if (retryCount > MAX_DEADLOCK_RETRIES) {
addSpanEvent(rpcSpan, 'max_retries_exceeded', { attempt: retryCount });
edgeLogger.error('Max deadlock retries exceeded', {
requestId,
submissionId,
attempt: retryCount,
action: 'process_rejection'
});
break;
}
const backoffMs = 100 * Math.pow(2, retryCount);
addSpanEvent(rpcSpan, 'deadlock_retry', { attempt: retryCount, backoffMs });
edgeLogger.warn('Deadlock detected, retrying', {
requestId,
attempt: retryCount,
maxAttempts: MAX_DEADLOCK_RETRIES,
backoffMs,
action: 'process_rejection'
});
await new Promise(r => setTimeout(r, backoffMs));
continue;
}
// Non-retryable error, break immediately
// Non-retryable error
addSpanEvent(rpcSpan, 'rpc_call_failed', {
error: rpcError.message,
errorCode: rpcError.code
@@ -407,19 +192,7 @@ const handler = async (req: Request) => {
}
if (rpcError) {
// Transaction failed - EVERYTHING rolled back automatically by PostgreSQL
endSpan(rpcSpan, 'error', rpcError);
logSpan(rpcSpan);
edgeLogger.error('Transaction failed', {
requestId,
duration: rpcSpan.duration,
submissionId,
error: rpcError.message,
errorCode: rpcError.code,
retries: retryCount,
action: 'process_rejection'
});
// Update idempotency key to failed
try {
@@ -432,56 +205,21 @@ const handler = async (req: Request) => {
})
.eq('idempotency_key', idempotencyKey);
} catch (updateError) {
edgeLogger.warn('Failed to update idempotency key', {
requestId,
idempotencyKey,
status: 'failed',
error: formatEdgeError(updateError),
action: 'process_rejection'
});
// Non-blocking - continue with error response even if idempotency update fails
// Non-blocking
}
endSpan(rootSpan, 'error', rpcError);
logSpan(rootSpan);
return new Response(
JSON.stringify({
error: 'Rejection transaction failed',
message: rpcError.message,
details: rpcError.details,
retries: retryCount
}),
{
status: 500,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
throw toError(rpcError);
}
// RPC succeeded
endSpan(rpcSpan, 'ok');
logSpan(rpcSpan);
setSpanAttributes(rootSpan, {
'result.status': result?.status,
'result.final_status': result?.status,
'retries': retryCount,
});
edgeLogger.info('Transaction completed successfully', {
requestId,
duration: rpcSpan.duration,
submissionId,
itemCount: itemIds.length,
retries: retryCount,
newStatus: result?.status,
action: 'process_rejection'
});
// STEP 8: Success - update idempotency key
// STEP 7: Success - update idempotency key
try {
await supabase
.from('submission_idempotency_keys')
@@ -492,57 +230,20 @@ const handler = async (req: Request) => {
})
.eq('idempotency_key', idempotencyKey);
} catch (updateError) {
edgeLogger.warn('Failed to update idempotency key', {
requestId,
idempotencyKey,
status: 'completed',
error: formatEdgeError(updateError),
action: 'process_rejection'
});
// Non-blocking - transaction succeeded, so continue with success response
}
endSpan(rootSpan, 'ok');
logSpan(rootSpan);
return new Response(
JSON.stringify(result),
{
status: 200,
headers: {
...corsHeaders,
'Content-Type': 'application/json',
'X-Request-Id': requestId
}
}
);
} catch (error) {
endSpan(rootSpan, 'error', error instanceof Error ? error : toError(error));
logSpan(rootSpan);
edgeLogger.error('Unexpected error', {
requestId,
duration: rootSpan.duration,
error: formatEdgeError(error),
stack: error instanceof Error ? error.stack : undefined,
action: 'process_rejection'
});
return new Response(
JSON.stringify({
error: 'Internal server error',
message: error instanceof Error ? error.message : 'Unknown error'
}),
{
status: 500,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
return result;
};
// Apply rate limiting: 10 requests per minute per IP (moderate tier for moderation actions)
serve(withRateLimit(handler, rateLimiters.moderate, corsHeaders));
// Create edge function with automatic error handling, CORS, auth, and logging
createEdgeFunction(
{
name: 'process-selective-rejection',
requireAuth: true,
corsEnabled: true,
enableTracing: true,
rateLimitTier: 'moderate'
},
handler
);

View File

@@ -5,8 +5,7 @@
* Requires admin/moderator authentication.
*/
import { createClient } from 'jsr:@supabase/supabase-js@2';
import { withRateLimit, rateLimiters } from '../_shared/rateLimiter.ts';
import { createEdgeFunction } from '../_shared/edgeFunctionWrapper.ts';
import {
getRecentMetrics,
getMetricsStats,
@@ -16,11 +15,6 @@ import {
clearMetrics,
} from '../_shared/rateLimitMetrics.ts';
const corsHeaders = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type',
};
interface QueryParams {
action?: string;
limit?: string;
@@ -30,38 +24,8 @@ interface QueryParams {
clientIP?: string;
}
async function handler(req: Request): Promise<Response> {
// Handle CORS preflight
if (req.method === 'OPTIONS') {
return new Response(null, { headers: corsHeaders });
}
try {
// Verify authentication
const authHeader = req.headers.get('Authorization');
if (!authHeader) {
return new Response(
JSON.stringify({ error: 'Authentication required' }),
{ status: 401, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
);
}
const supabaseUrl = Deno.env.get('SUPABASE_URL')!;
const supabaseServiceKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!;
const supabase = createClient(supabaseUrl, supabaseServiceKey, {
global: {
headers: { Authorization: authHeader },
},
});
// Get authenticated user
const { data: { user }, error: authError } = await supabase.auth.getUser();
if (authError || !user) {
return new Response(
JSON.stringify({ error: 'Invalid authentication' }),
{ status: 401, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
);
}
const handler = async (req: Request, context: { supabase: any; user: any; span: any; requestId: string }) => {
const { supabase, user } = context;
// Check if user has admin or moderator role
const { data: roles } = await supabase
@@ -69,16 +33,13 @@ async function handler(req: Request): Promise<Response> {
.select('role')
.eq('user_id', user.id);
const userRoles = roles?.map(r => r.role) || [];
const isAuthorized = userRoles.some(role =>
const userRoles = roles?.map((r: any) => r.role) || [];
const isAuthorized = userRoles.some((role: string) =>
['admin', 'moderator', 'superuser'].includes(role)
);
if (!isAuthorized) {
return new Response(
JSON.stringify({ error: 'Insufficient permissions' }),
{ status: 403, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
);
throw new Error('Insufficient permissions');
}
// Parse query parameters
@@ -107,10 +68,7 @@ async function handler(req: Request): Promise<Response> {
case 'function':
if (!functionName) {
return new Response(
JSON.stringify({ error: 'functionName parameter required for function action' }),
{ status: 400, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
);
throw new Error('functionName parameter required for function action');
}
responseData = {
functionName,
@@ -121,10 +79,7 @@ async function handler(req: Request): Promise<Response> {
case 'user':
if (!userId) {
return new Response(
JSON.stringify({ error: 'userId parameter required for user action' }),
{ status: 400, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
);
throw new Error('userId parameter required for user action');
}
responseData = {
userId,
@@ -135,10 +90,7 @@ async function handler(req: Request): Promise<Response> {
case 'ip':
if (!clientIP) {
return new Response(
JSON.stringify({ error: 'clientIP parameter required for ip action' }),
{ status: 400, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
);
throw new Error('clientIP parameter required for ip action');
}
responseData = {
clientIP,
@@ -151,50 +103,27 @@ async function handler(req: Request): Promise<Response> {
// Only superusers can clear metrics
const isSuperuser = userRoles.includes('superuser');
if (!isSuperuser) {
return new Response(
JSON.stringify({ error: 'Only superusers can clear metrics' }),
{ status: 403, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
);
throw new Error('Only superusers can clear metrics');
}
clearMetrics();
responseData = { success: true, message: 'Metrics cleared' };
break;
default:
return new Response(
JSON.stringify({
error: 'Invalid action',
validActions: ['recent', 'stats', 'function', 'user', 'ip', 'clear']
}),
{ status: 400, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
);
throw new Error('Invalid action. Valid actions: recent, stats, function, user, ip, clear');
}
return new Response(
JSON.stringify(responseData),
return responseData;
};
// Create edge function with automatic error handling, CORS, auth, and logging
createEdgeFunction(
{
status: 200,
headers: {
...corsHeaders,
'Content-Type': 'application/json',
}
}
);
} catch (error) {
console.error('Error in rate-limit-metrics function:', error);
return new Response(
JSON.stringify({
error: 'Internal server error',
message: error instanceof Error ? error.message : 'Unknown error'
}),
{
status: 500,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
}
);
}
}
// Apply rate limiting (lenient tier for admin monitoring)
Deno.serve(withRateLimit(handler, rateLimiters.lenient, corsHeaders, 'rate-limit-metrics'));
name: 'rate-limit-metrics',
requireAuth: true,
corsEnabled: true,
enableTracing: false,
rateLimitTier: 'lenient'
},
handler
);