Files
thrilltrack-explorer/supabase/functions/process-selective-approval/index.ts
gpt-engineer-app[bot] ed6ddbd04b Centralize rate limiting config
Create shared rateLimitConfig.ts with tiers (strict, moderate, lenient, generous, per-user variants) and update edge functions to import centralized rate limiters. Replace inline rate limiter usage with new config, preserving backward compatibility. Add documentation guide for rate limiting usage.
2025-11-10 21:33:08 +00:00

563 lines
17 KiB
TypeScript

import { serve } from 'https://deno.land/std@0.168.0/http/server.ts';
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2.57.4';
import { corsHeadersWithTracing as corsHeaders } from '../_shared/cors.ts';
import { rateLimiters, withRateLimit } from '../_shared/rateLimiter.ts';
import {
edgeLogger,
startSpan,
endSpan,
addSpanEvent,
setSpanAttributes,
getSpanContext,
logSpan,
extractSpanContextFromHeaders,
type Span
} from '../_shared/logger.ts';
import { formatEdgeError, toError } from '../_shared/errorFormatter.ts';
const SUPABASE_URL = Deno.env.get('SUPABASE_URL') || 'https://api.thrillwiki.com';
const SUPABASE_ANON_KEY = Deno.env.get('SUPABASE_ANON_KEY');
// ============================================================================
// CRITICAL: Validate environment variables at startup
// ============================================================================
if (!SUPABASE_ANON_KEY) {
const errorMsg = 'CRITICAL: SUPABASE_ANON_KEY environment variable is not set!';
console.error(errorMsg, {
timestamp: new Date().toISOString(),
hasUrl: !!SUPABASE_URL,
url: SUPABASE_URL,
availableEnvVars: Object.keys(Deno.env.toObject()).filter(k =>
k.includes('SUPABASE') || k.includes('URL')
)
});
throw new Error('Missing required environment variable: SUPABASE_ANON_KEY');
}
console.log('Edge function initialized successfully', {
timestamp: new Date().toISOString(),
function: 'process-selective-approval',
hasUrl: !!SUPABASE_URL,
hasKey: !!SUPABASE_ANON_KEY,
keyLength: SUPABASE_ANON_KEY.length
});
interface ApprovalRequest {
submissionId: string;
itemIds: string[];
idempotencyKey: string;
}
// Main handler function
const handler = async (req: Request) => {
// ============================================================================
// Log every incoming request immediately
// ============================================================================
console.log('Request received', {
timestamp: new Date().toISOString(),
method: req.method,
url: req.url,
headers: {
authorization: req.headers.has('Authorization') ? '[PRESENT]' : '[MISSING]',
contentType: req.headers.get('Content-Type'),
traceparent: req.headers.get('traceparent') || '[NONE]'
}
});
// Handle CORS preflight requests
if (req.method === 'OPTIONS') {
return new Response(null, {
status: 204,
headers: corsHeaders
});
}
// Extract parent span context from headers (if present)
const parentSpanContext = extractSpanContextFromHeaders(req.headers);
// Create root span for this edge function invocation
const rootSpan = startSpan(
'process-selective-approval',
'SERVER',
parentSpanContext,
{
'http.method': 'POST',
'function.name': 'process-selective-approval',
}
);
const requestId = rootSpan.spanId;
try {
// STEP 1: Authentication
addSpanEvent(rootSpan, 'authentication_start');
const authHeader = req.headers.get('Authorization');
if (!authHeader) {
addSpanEvent(rootSpan, 'authentication_failed', { reason: 'missing_header' });
endSpan(rootSpan, 'error');
logSpan(rootSpan);
return new Response(
JSON.stringify({ error: 'Missing Authorization header' }),
{
status: 401,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
const supabase = createClient(SUPABASE_URL, SUPABASE_ANON_KEY, {
global: { headers: { Authorization: authHeader } }
});
const { data: { user }, error: authError } = await supabase.auth.getUser();
if (authError || !user) {
addSpanEvent(rootSpan, 'authentication_failed', { error: authError?.message });
edgeLogger.warn('Authentication failed', {
requestId,
error: authError?.message,
action: 'process_approval'
});
endSpan(rootSpan, 'error', authError || new Error('Unauthorized'));
logSpan(rootSpan);
return new Response(
JSON.stringify({ error: 'Unauthorized' }),
{
status: 401,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
setSpanAttributes(rootSpan, { 'user.id': user.id });
addSpanEvent(rootSpan, 'authentication_success');
edgeLogger.info('Approval request received', {
requestId,
moderatorId: user.id,
action: 'process_approval'
});
// STEP 2: Parse request
addSpanEvent(rootSpan, 'validation_start');
const body: ApprovalRequest = await req.json();
const { submissionId, itemIds, idempotencyKey } = body;
if (!submissionId || !itemIds || itemIds.length === 0) {
addSpanEvent(rootSpan, 'validation_failed', {
hasSubmissionId: !!submissionId,
hasItemIds: !!itemIds,
itemCount: itemIds?.length || 0,
});
edgeLogger.warn('Invalid request payload', {
requestId,
hasSubmissionId: !!submissionId,
hasItemIds: !!itemIds,
itemCount: itemIds?.length || 0,
action: 'process_approval'
});
endSpan(rootSpan, 'error');
logSpan(rootSpan);
return new Response(
JSON.stringify({ error: 'Missing required fields: submissionId, itemIds' }),
{
status: 400,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
setSpanAttributes(rootSpan, {
'submission.id': submissionId,
'submission.item_count': itemIds.length,
'idempotency.key': idempotencyKey,
});
addSpanEvent(rootSpan, 'validation_complete');
edgeLogger.info('Request validated', {
requestId,
submissionId,
itemCount: itemIds.length,
action: 'process_approval'
});
// STEP 3: Idempotency check
addSpanEvent(rootSpan, 'idempotency_check_start');
const { data: existingKey } = await supabase
.from('submission_idempotency_keys')
.select('*')
.eq('idempotency_key', idempotencyKey)
.single();
if (existingKey?.status === 'completed') {
addSpanEvent(rootSpan, 'idempotency_cache_hit');
setSpanAttributes(rootSpan, { 'cache.hit': true });
edgeLogger.info('Idempotency cache hit', {
requestId,
idempotencyKey,
cached: true,
action: 'process_approval'
});
endSpan(rootSpan, 'ok');
logSpan(rootSpan);
return new Response(
JSON.stringify(existingKey.result_data),
{
status: 200,
headers: {
...corsHeaders,
'Content-Type': 'application/json',
'X-Cache-Status': 'HIT'
}
}
);
}
// STEP 4: Fetch submission to get submitter_id
const { data: submission, error: submissionError } = await supabase
.from('content_submissions')
.select('user_id, status, assigned_to')
.eq('id', submissionId)
.single();
if (submissionError || !submission) {
addSpanEvent(rootSpan, 'submission_fetch_failed', { error: submissionError?.message });
edgeLogger.error('Submission not found', {
requestId,
submissionId,
error: submissionError?.message,
action: 'process_approval'
});
endSpan(rootSpan, 'error', submissionError || new Error('Submission not found'));
logSpan(rootSpan);
return new Response(
JSON.stringify({ error: 'Submission not found' }),
{
status: 404,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
// STEP 5: Verify moderator can approve this submission
if (submission.assigned_to && submission.assigned_to !== user.id) {
edgeLogger.warn('Lock conflict', {
requestId,
submissionId,
lockedBy: submission.assigned_to,
attemptedBy: user.id,
action: 'process_approval'
});
return new Response(
JSON.stringify({ error: 'Submission is locked by another moderator' }),
{
status: 409,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
if (!['pending', 'partially_approved'].includes(submission.status)) {
edgeLogger.warn('Invalid submission status', {
requestId,
submissionId,
currentStatus: submission.status,
expectedStatuses: ['pending', 'partially_approved'],
action: 'process_approval'
});
return new Response(
JSON.stringify({ error: 'Submission already processed' }),
{
status: 400,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
// STEP 6: Register idempotency key as processing (atomic upsert)
// ✅ CRITICAL FIX: Use ON CONFLICT to prevent race conditions
if (!existingKey) {
const { data: insertedKey, error: idempotencyError } = await supabase
.from('submission_idempotency_keys')
.insert({
idempotency_key: idempotencyKey,
submission_id: submissionId,
moderator_id: user.id,
item_ids: itemIds,
status: 'processing'
})
.select()
.single();
// If conflict occurred, another moderator is processing
if (idempotencyError && idempotencyError.code === '23505') {
edgeLogger.warn('Idempotency key conflict - another request processing', {
requestId,
idempotencyKey,
moderatorId: user.id
});
return new Response(
JSON.stringify({ error: 'Another moderator is processing this submission' }),
{ status: 409, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
);
}
if (idempotencyError) {
throw toError(idempotencyError);
}
}
// Create child span for RPC transaction
const rpcSpan = startSpan(
'process_approval_transaction',
'DATABASE',
getSpanContext(rootSpan),
{
'db.operation': 'rpc',
'db.function': 'process_approval_transaction',
'submission.id': submissionId,
'submission.item_count': itemIds.length,
}
);
addSpanEvent(rpcSpan, 'rpc_call_start');
edgeLogger.info('Calling approval transaction RPC', {
requestId,
submissionId,
itemCount: itemIds.length,
action: 'process_approval'
});
// ============================================================================
// STEP 7: Call RPC function with deadlock retry logic
// ============================================================================
let retryCount = 0;
const MAX_DEADLOCK_RETRIES = 3;
let result: any = null;
let rpcError: any = null;
while (retryCount <= MAX_DEADLOCK_RETRIES) {
const { data, error } = await supabase.rpc(
'process_approval_transaction',
{
p_submission_id: submissionId,
p_item_ids: itemIds,
p_moderator_id: user.id,
p_submitter_id: submission.user_id,
p_request_id: requestId,
p_trace_id: rootSpan.traceId,
p_parent_span_id: rpcSpan.spanId
}
);
result = data;
rpcError = error;
if (!rpcError) {
// Success!
addSpanEvent(rpcSpan, 'rpc_call_success', {
'result.status': data?.status,
'items.processed': itemIds.length,
});
break;
}
// Check for deadlock (40P01) or serialization failure (40001)
if (rpcError.code === '40P01' || rpcError.code === '40001') {
retryCount++;
if (retryCount > MAX_DEADLOCK_RETRIES) {
addSpanEvent(rpcSpan, 'max_retries_exceeded', { attempt: retryCount });
edgeLogger.error('Max deadlock retries exceeded', {
requestId,
submissionId,
attempt: retryCount,
action: 'process_approval'
});
break;
}
const backoffMs = 100 * Math.pow(2, retryCount);
addSpanEvent(rpcSpan, 'deadlock_retry', { attempt: retryCount, backoffMs });
edgeLogger.warn('Deadlock detected, retrying', {
requestId,
attempt: retryCount,
maxAttempts: MAX_DEADLOCK_RETRIES,
backoffMs,
action: 'process_approval'
});
await new Promise(r => setTimeout(r, backoffMs));
continue;
}
// Non-retryable error, break immediately
addSpanEvent(rpcSpan, 'rpc_call_failed', {
error: rpcError.message,
errorCode: rpcError.code
});
break;
}
if (rpcError) {
// Transaction failed - EVERYTHING rolled back automatically by PostgreSQL
endSpan(rpcSpan, 'error', rpcError);
logSpan(rpcSpan);
edgeLogger.error('Transaction failed', {
requestId,
duration: rpcSpan.duration,
submissionId,
error: rpcError.message,
errorCode: rpcError.code,
retries: retryCount,
action: 'process_approval'
});
// Update idempotency key to failed
try {
await supabase
.from('submission_idempotency_keys')
.update({
status: 'failed',
error_message: rpcError.message,
completed_at: new Date().toISOString()
})
.eq('idempotency_key', idempotencyKey);
} catch (updateError) {
edgeLogger.warn('Failed to update idempotency key', {
requestId,
idempotencyKey,
status: 'failed',
error: formatEdgeError(updateError),
action: 'process_approval'
});
// Non-blocking - continue with error response even if idempotency update fails
}
endSpan(rootSpan, 'error', rpcError);
logSpan(rootSpan);
return new Response(
JSON.stringify({
error: 'Approval transaction failed',
message: rpcError.message,
details: rpcError.details,
retries: retryCount
}),
{
status: 500,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
// RPC succeeded
endSpan(rpcSpan, 'ok');
logSpan(rpcSpan);
setSpanAttributes(rootSpan, {
'result.status': result?.status,
'result.final_status': result?.status,
'retries': retryCount,
});
edgeLogger.info('Transaction completed successfully', {
requestId,
duration: rpcSpan.duration,
submissionId,
itemCount: itemIds.length,
retries: retryCount,
newStatus: result?.status,
action: 'process_approval'
});
// STEP 8: Success - update idempotency key
try {
await supabase
.from('submission_idempotency_keys')
.update({
status: 'completed',
result_data: result,
completed_at: new Date().toISOString()
})
.eq('idempotency_key', idempotencyKey);
} catch (updateError) {
edgeLogger.warn('Failed to update idempotency key', {
requestId,
idempotencyKey,
status: 'completed',
error: formatEdgeError(updateError),
action: 'process_approval'
});
// Non-blocking - transaction succeeded, so continue with success response
}
endSpan(rootSpan, 'ok');
logSpan(rootSpan);
return new Response(
JSON.stringify(result),
{
status: 200,
headers: {
...corsHeaders,
'Content-Type': 'application/json',
'X-Request-Id': requestId
}
}
);
} catch (error) {
// Enhanced error logging with full details
const errorDetails = {
timestamp: new Date().toISOString(),
requestId: rootSpan?.spanId || 'unknown',
duration: rootSpan?.duration || 0,
error: formatEdgeError(error),
errorType: error instanceof Error ? error.constructor.name : typeof error,
stack: error instanceof Error ? error.stack : undefined,
action: 'process_approval'
};
console.error('Uncaught error in handler', errorDetails);
endSpan(rootSpan, 'error', error instanceof Error ? error : toError(error));
logSpan(rootSpan);
edgeLogger.error('Unexpected error', errorDetails);
return new Response(
JSON.stringify({
error: 'Internal server error',
message: error instanceof Error ? error.message : 'Unknown error',
requestId: rootSpan?.spanId || 'unknown'
}),
{
status: 500,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
};
// Apply rate limiting: 10 requests per minute per IP (moderate tier for moderation actions)
serve(withRateLimit(handler, rateLimiters.moderate, corsHeaders));