import { serve } from "https://deno.land/std@0.168.0/http/server.ts"; import { createClient } from "https://esm.sh/@supabase/supabase-js@2.57.4"; import { Novu } from "npm:@novu/api@1.6.0"; import { corsHeaders } from '../_shared/cors.ts'; import { edgeLogger, startRequest, endRequest } from '../_shared/logger.ts'; import { withEdgeRetry } from '../_shared/retryHelper.ts'; const TOPICS = { MODERATION_SUBMISSIONS: 'moderation-submissions', MODERATION_REPORTS: 'moderation-reports', } as const; serve(async (req) => { const tracking = startRequest(); if (req.method === 'OPTIONS') { return new Response(null, { headers: corsHeaders }); } try { const novuApiKey = Deno.env.get('NOVU_API_KEY'); const supabaseUrl = Deno.env.get('SUPABASE_URL'); const supabaseServiceKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY'); if (!novuApiKey || !supabaseUrl || !supabaseServiceKey) { throw new Error('Missing required environment variables'); } const novu = new Novu({ secretKey: novuApiKey }); const supabase = createClient(supabaseUrl, supabaseServiceKey); edgeLogger.info('Starting moderator sync to Novu topics', { requestId: tracking.requestId, action: 'sync_moderators' }); // Get all moderators, admins, and superusers const { data: moderatorRoles, error: rolesError } = await supabase .from('user_roles') .select('user_id, role') .in('role', ['moderator', 'admin', 'superuser']); if (rolesError) { throw new Error(`Failed to fetch moderator roles: ${rolesError.message}`); } // Get unique user IDs (a user might have multiple moderator-level roles) const uniqueUserIds = [...new Set(moderatorRoles.map(r => r.user_id))]; edgeLogger.info('Found unique moderators to sync', { requestId: tracking.requestId, count: uniqueUserIds.length }); const topics = [TOPICS.MODERATION_SUBMISSIONS, TOPICS.MODERATION_REPORTS]; const results = { totalUsers: uniqueUserIds.length, topics: [] as any[], }; for (const topicKey of topics) { try { // Ensure topic exists (Novu will create it if it doesn't) await novu.topics.create({ key: topicKey, name: topicKey }); edgeLogger.info('Topic ready', { requestId: tracking.requestId, topicKey }); } catch (error: any) { // Topic might already exist, which is fine if (!error.message?.includes('already exists')) { edgeLogger.warn('Note about topic', { requestId: tracking.requestId, topicKey, error: error.message }); } } // Add all moderators to the topic in batches const batchSize = 100; let successCount = 0; let errorCount = 0; for (let i = 0; i < uniqueUserIds.length; i += batchSize) { const batch = uniqueUserIds.slice(i, i + batchSize); try { await withEdgeRetry( async () => { await novu.topics.addSubscribers(topicKey, { subscribers: batch, }); }, { maxAttempts: 3, baseDelay: 2000 }, tracking.requestId, `sync-batch-${topicKey}-${i}` ); successCount += batch.length; edgeLogger.info('Added batch of users to topic', { requestId: tracking.requestId, topicKey, batchSize: batch.length }); } catch (error: any) { errorCount += batch.length; edgeLogger.error('Error adding batch to topic', { requestId: tracking.requestId, topicKey, batchSize: batch.length, error: error.message }); } } results.topics.push({ topicKey, successCount, errorCount, }); } const duration = endRequest(tracking); edgeLogger.info('Sync completed', { requestId: tracking.requestId, duration, results }); return new Response( JSON.stringify({ success: true, message: 'Moderator sync completed', results, requestId: tracking.requestId }), { headers: { ...corsHeaders, 'Content-Type': 'application/json', 'X-Request-ID': tracking.requestId }, status: 200, } ); } catch (error: any) { const duration = endRequest(tracking); edgeLogger.error('Error syncing moderators to topics', { requestId: tracking.requestId, duration, error: error.message }); return new Response( JSON.stringify({ success: false, error: error.message, requestId: tracking.requestId }), { headers: { ...corsHeaders, 'Content-Type': 'application/json', 'X-Request-ID': tracking.requestId }, status: 500, } ); } });