import { createClient } from "https://esm.sh/@supabase/supabase-js@2.57.4"; import { Novu } from "npm:@novu/api@1.6.0"; import { corsHeaders } from '../_shared/cors.ts'; import { edgeLogger } from '../_shared/logger.ts'; import { withEdgeRetry } from '../_shared/retryHelper.ts'; import { createEdgeFunction } from '../_shared/edgeFunctionWrapper.ts'; const TOPICS = { MODERATION_SUBMISSIONS: 'moderation-submissions', MODERATION_REPORTS: 'moderation-reports', } as const; export default createEdgeFunction( { name: 'sync-all-moderators-to-topic', requireAuth: false, corsHeaders: corsHeaders }, async (req, context) => { const novuApiKey = Deno.env.get('NOVU_API_KEY'); const supabaseUrl = Deno.env.get('SUPABASE_URL'); const supabaseServiceKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY'); if (!novuApiKey || !supabaseUrl || !supabaseServiceKey) { throw new Error('Missing required environment variables'); } const novu = new Novu({ secretKey: novuApiKey }); const supabase = createClient(supabaseUrl, supabaseServiceKey); context.span.setAttribute('action', 'sync_moderators'); edgeLogger.info('Starting moderator sync to Novu topics', { requestId: context.requestId, action: 'sync_moderators' }); // Get all moderators, admins, and superusers const { data: moderatorRoles, error: rolesError } = await supabase .from('user_roles') .select('user_id, role') .in('role', ['moderator', 'admin', 'superuser']); if (rolesError) { throw new Error(`Failed to fetch moderator roles: ${rolesError.message}`); } // Get unique user IDs (a user might have multiple moderator-level roles) const uniqueUserIds = [...new Set(moderatorRoles.map(r => r.user_id))]; edgeLogger.info('Found unique moderators to sync', { requestId: context.requestId, count: uniqueUserIds.length }); const topics = [TOPICS.MODERATION_SUBMISSIONS, TOPICS.MODERATION_REPORTS]; const results = { totalUsers: uniqueUserIds.length, topics: [] as any[], }; for (const topicKey of topics) { try { // Ensure topic exists (Novu will create it if it doesn't) await novu.topics.create({ key: topicKey, name: topicKey }); edgeLogger.info('Topic ready', { requestId: context.requestId, topicKey }); } catch (error: any) { // Topic might already exist, which is fine if (!error.message?.includes('already exists')) { edgeLogger.warn('Note about topic', { requestId: context.requestId, topicKey, error: error.message }); } } // Add all moderators to the topic in batches const batchSize = 100; let successCount = 0; let errorCount = 0; for (let i = 0; i < uniqueUserIds.length; i += batchSize) { const batch = uniqueUserIds.slice(i, i + batchSize); try { await withEdgeRetry( async () => { await novu.topics.addSubscribers(topicKey, { subscribers: batch, }); }, { maxAttempts: 3, baseDelay: 2000 }, context.requestId, `sync-batch-${topicKey}-${i}` ); successCount += batch.length; edgeLogger.info('Added batch of users to topic', { requestId: context.requestId, topicKey, batchSize: batch.length }); } catch (error: any) { errorCount += batch.length; edgeLogger.error('Error adding batch to topic', { requestId: context.requestId, topicKey, batchSize: batch.length, error: error.message }); } } results.topics.push({ topicKey, successCount, errorCount, }); } edgeLogger.info('Sync completed', { requestId: context.requestId, results }); return new Response( JSON.stringify({ success: true, message: 'Moderator sync completed', results, }), { headers: { 'Content-Type': 'application/json', }, status: 200, } ); } );