diff --git a/src/hooks/useRealtimeModerationStats.ts b/src/hooks/useRealtimeModerationStats.ts index 8172b55f..c6da4e8b 100644 --- a/src/hooks/useRealtimeModerationStats.ts +++ b/src/hooks/useRealtimeModerationStats.ts @@ -104,222 +104,104 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp }; }, [enabled, fetchStats, connectionState]); - // Set up realtime connection with all listeners configured before subscribing + // Set up broadcast channels for real-time updates useEffect(() => { if (!realtimeEnabled) { - console.log('[Realtime:moderation-stats-changes] Realtime disabled'); + console.log('[Realtime:moderation-stats] Realtime disabled'); return; } - console.log('[Realtime:moderation-stats-changes] Creating new channel'); + console.log('[Realtime:moderation-stats] Creating broadcast channels'); setConnectionState('connecting'); - const newChannel = supabase - .channel('moderation-stats-changes') - .on( - 'postgres_changes', - { - event: 'INSERT', - schema: 'public', - table: 'content_submissions', - }, - (payload) => { - console.log('Content submission inserted'); - if (payload.new.status === 'pending') { - setStats(prev => ({ - ...prev, - pendingSubmissions: prev.pendingSubmissions + 1 - })); - } - debouncedFetchStats(); - } - ) - .on( - 'postgres_changes', - { - event: 'UPDATE', - schema: 'public', - table: 'content_submissions', - }, - (payload) => { - console.log('Content submission updated'); - const oldStatus = payload.old.status; - const newStatus = payload.new.status; - - if (oldStatus === 'pending' && newStatus !== 'pending') { - setStats(prev => ({ - ...prev, - pendingSubmissions: Math.max(0, prev.pendingSubmissions - 1) - })); - } else if (oldStatus !== 'pending' && newStatus === 'pending') { - setStats(prev => ({ - ...prev, - pendingSubmissions: prev.pendingSubmissions + 1 - })); - } - - debouncedFetchStats(); - } - ) - .on( - 'postgres_changes', - { - event: 'DELETE', - schema: 'public', - table: 'content_submissions', - }, - (payload) => { - console.log('Content submission deleted'); - if (payload.old.status === 'pending') { - setStats(prev => ({ - ...prev, - pendingSubmissions: Math.max(0, prev.pendingSubmissions - 1) - })); - } - debouncedFetchStats(); - } - ) - .on( - 'postgres_changes', - { - event: 'INSERT', - schema: 'public', - table: 'reports', - }, - () => { - console.log('Report inserted'); - setStats(prev => ({ - ...prev, - openReports: prev.openReports + 1 - })); - debouncedFetchStats(); - } - ) - .on( - 'postgres_changes', - { - event: 'UPDATE', - schema: 'public', - table: 'reports', - }, - (payload) => { - console.log('Report updated'); - const oldStatus = payload.old.status; - const newStatus = payload.new.status; - - if (oldStatus === 'pending' && newStatus !== 'pending') { - setStats(prev => ({ - ...prev, - openReports: Math.max(0, prev.openReports - 1) - })); - } else if (oldStatus !== 'pending' && newStatus === 'pending') { - setStats(prev => ({ - ...prev, - openReports: prev.openReports + 1 - })); - } - - debouncedFetchStats(); - } - ) - .on( - 'postgres_changes', - { - event: 'DELETE', - schema: 'public', - table: 'reports', - }, - (payload) => { - console.log('Report deleted'); - if (payload.old.status === 'pending') { - setStats(prev => ({ - ...prev, - openReports: Math.max(0, prev.openReports - 1) - })); - } - debouncedFetchStats(); - } - ) - .on( - 'postgres_changes', - { - event: 'INSERT', - schema: 'public', - table: 'reviews', - }, - () => { - console.log('Review inserted'); - setStats(prev => ({ - ...prev, - flaggedContent: prev.flaggedContent + 1 - })); - debouncedFetchStats(); - } - ) - .on( - 'postgres_changes', - { - event: 'UPDATE', - schema: 'public', - table: 'reviews', - }, - (payload) => { - console.log('Review updated'); - const oldStatus = payload.old.moderation_status; - const newStatus = payload.new.moderation_status; - - if (oldStatus === 'flagged' && newStatus !== 'flagged') { - setStats(prev => ({ - ...prev, - flaggedContent: Math.max(0, prev.flaggedContent - 1) - })); - } else if (oldStatus !== 'flagged' && newStatus === 'flagged') { - setStats(prev => ({ - ...prev, - flaggedContent: prev.flaggedContent + 1 - })); - } - - debouncedFetchStats(); - } - ) - .on( - 'postgres_changes', - { - event: 'DELETE', - schema: 'public', - table: 'reviews', - }, - (payload) => { - console.log('Review deleted'); - if (payload.old.moderation_status === 'flagged') { - setStats(prev => ({ - ...prev, - flaggedContent: Math.max(0, prev.flaggedContent - 1) - })); - } - debouncedFetchStats(); - } - ) - .subscribe((status) => { - console.log('[Realtime:moderation-stats-changes] Subscription status:', status); - - if (status === 'SUBSCRIBED') { - setConnectionState('connected'); - } else if (status === 'CHANNEL_ERROR') { - setConnectionState('error'); - } else if (status === 'TIMED_OUT') { - setConnectionState('disconnected'); - } else if (status === 'CLOSED') { - setConnectionState('disconnected'); - } - }); + const setupChannels = async () => { + // Set auth token for private channels + await supabase.realtime.setAuth(); - setChannel(newChannel); + const submissionsChannel = supabase + .channel('moderation:content_submissions', { + config: { private: true }, + }) + .on('broadcast', { event: 'INSERT' }, () => { + console.log('[Realtime:moderation-stats] Content submission inserted'); + debouncedFetchStats(); + }) + .on('broadcast', { event: 'UPDATE' }, () => { + console.log('[Realtime:moderation-stats] Content submission updated'); + debouncedFetchStats(); + }) + .on('broadcast', { event: 'DELETE' }, () => { + console.log('[Realtime:moderation-stats] Content submission deleted'); + debouncedFetchStats(); + }) + .subscribe((status) => { + console.log('[Realtime:moderation-stats] Submissions channel status:', status); + + if (status === 'SUBSCRIBED') { + setConnectionState('connected'); + } else if (status === 'CHANNEL_ERROR') { + setConnectionState('error'); + } else if (status === 'TIMED_OUT') { + setConnectionState('disconnected'); + console.log('[Realtime:moderation-stats] Falling back to polling'); + } else if (status === 'CLOSED') { + setConnectionState('disconnected'); + } + }); + + const reportsChannel = supabase + .channel('moderation:reports', { + config: { private: true }, + }) + .on('broadcast', { event: 'INSERT' }, () => { + console.log('[Realtime:moderation-stats] Report inserted'); + debouncedFetchStats(); + }) + .on('broadcast', { event: 'UPDATE' }, () => { + console.log('[Realtime:moderation-stats] Report updated'); + debouncedFetchStats(); + }) + .on('broadcast', { event: 'DELETE' }, () => { + console.log('[Realtime:moderation-stats] Report deleted'); + debouncedFetchStats(); + }) + .subscribe(); + + const reviewsChannel = supabase + .channel('moderation:reviews', { + config: { private: true }, + }) + .on('broadcast', { event: 'INSERT' }, () => { + console.log('[Realtime:moderation-stats] Review inserted'); + debouncedFetchStats(); + }) + .on('broadcast', { event: 'UPDATE' }, () => { + console.log('[Realtime:moderation-stats] Review updated'); + debouncedFetchStats(); + }) + .on('broadcast', { event: 'DELETE' }, () => { + console.log('[Realtime:moderation-stats] Review deleted'); + debouncedFetchStats(); + }) + .subscribe(); + + setChannel(submissionsChannel); + + return { submissionsChannel, reportsChannel, reviewsChannel }; + }; + + let channels: Awaited>; + setupChannels().then((c) => { + channels = c; + }); return () => { - console.log('[Realtime:moderation-stats-changes] Cleaning up channel'); - supabase.removeChannel(newChannel); + console.log('[Realtime:moderation-stats] Cleaning up channels'); + if (channels) { + supabase.removeChannel(channels.submissionsChannel); + supabase.removeChannel(channels.reportsChannel); + supabase.removeChannel(channels.reviewsChannel); + } }; }, [realtimeEnabled, debouncedFetchStats]); diff --git a/src/hooks/useRealtimeSubmissionItems.ts b/src/hooks/useRealtimeSubmissionItems.ts index 807d3385..a3ac1fc2 100644 --- a/src/hooks/useRealtimeSubmissionItems.ts +++ b/src/hooks/useRealtimeSubmissionItems.ts @@ -37,43 +37,49 @@ export const useRealtimeSubmissionItems = (options: UseRealtimeSubmissionItemsOp return; } - console.log('[Realtime:submission-items] Creating new channel for submission:', submissionId); + console.log('[Realtime:submission-items] Creating new broadcast channel for submission:', submissionId); setConnectionState('connecting'); - const newChannel = supabase - .channel(`submission-items-${submissionId}`) - .on( - 'postgres_changes', - { - event: 'UPDATE', - schema: 'public', - table: 'submission_items', - filter: `submission_id=eq.${submissionId}`, - }, - (payload) => { - console.log('Submission item updated:', payload); - onUpdateRef.current?.(payload); - } - ) - .subscribe((status) => { - console.log(`[Realtime:submission-items-${submissionId}] Subscription status:`, status); - - if (status === 'SUBSCRIBED') { - setConnectionState('connected'); - } else if (status === 'CHANNEL_ERROR') { - setConnectionState('error'); - } else if (status === 'TIMED_OUT') { - setConnectionState('disconnected'); - } else if (status === 'CLOSED') { - setConnectionState('disconnected'); - } - }); + const setupChannel = async () => { + // Set auth token for private channel + await supabase.realtime.setAuth(); - setChannel(newChannel); + const newChannel = supabase + .channel('moderation:submission_items', { + config: { private: true }, + }) + .on('broadcast', { event: 'UPDATE' }, (payload) => { + // Client-side filtering for specific submission + const itemData = payload.payload; + if (itemData?.new?.submission_id === submissionId) { + console.log('Submission item updated:', payload); + onUpdateRef.current?.(payload); + } + }) + .subscribe((status) => { + console.log(`[Realtime:submission-items] Subscription status:`, status); + + if (status === 'SUBSCRIBED') { + setConnectionState('connected'); + } else if (status === 'CHANNEL_ERROR') { + setConnectionState('error'); + } else if (status === 'TIMED_OUT') { + setConnectionState('disconnected'); + } else if (status === 'CLOSED') { + setConnectionState('disconnected'); + } + }); + + setChannel(newChannel); + }; + + setupChannel(); return () => { - console.log('[Realtime:submission-items] Cleaning up channel'); - supabase.removeChannel(newChannel); + if (channel) { + console.log('[Realtime:submission-items] Cleaning up channel'); + supabase.removeChannel(channel); + } }; }, [enabled, submissionId]); diff --git a/src/hooks/useRealtimeSubmissions.ts b/src/hooks/useRealtimeSubmissions.ts index c398c814..14cd1cde 100644 --- a/src/hooks/useRealtimeSubmissions.ts +++ b/src/hooks/useRealtimeSubmissions.ts @@ -43,70 +43,57 @@ export const useRealtimeSubmissions = (options: UseRealtimeSubmissionsOptions = useEffect(() => { if (!realtimeEnabled) { - console.log('[Realtime:content-submissions-changes] Realtime disabled'); + console.log('[Realtime:content-submissions] Realtime disabled'); return; } - console.log('[Realtime:content-submissions-changes] Creating new channel'); + console.log('[Realtime:content-submissions] Creating new broadcast channel'); setConnectionState('connecting'); - const newChannel = supabase - .channel('content-submissions-changes') - .on( - 'postgres_changes', - { - event: 'INSERT', - schema: 'public', - table: 'content_submissions', - }, - (payload) => { + const setupChannel = async () => { + // Set auth token for private channel + await supabase.realtime.setAuth(); + + const newChannel = supabase + .channel('moderation:content_submissions', { + config: { private: true }, + }) + .on('broadcast', { event: 'INSERT' }, (payload) => { console.log('Submission inserted:', payload); onInsertRef.current?.(payload); - } - ) - .on( - 'postgres_changes', - { - event: 'UPDATE', - schema: 'public', - table: 'content_submissions', - }, - (payload) => { + }) + .on('broadcast', { event: 'UPDATE' }, (payload) => { console.log('Submission updated:', payload); onUpdateRef.current?.(payload); - } - ) - .on( - 'postgres_changes', - { - event: 'DELETE', - schema: 'public', - table: 'content_submissions', - }, - (payload) => { + }) + .on('broadcast', { event: 'DELETE' }, (payload) => { console.log('Submission deleted:', payload); onDeleteRef.current?.(payload); - } - ) - .subscribe((status) => { - console.log('[Realtime:content-submissions-changes] Subscription status:', status); - - if (status === 'SUBSCRIBED') { - setConnectionState('connected'); - } else if (status === 'CHANNEL_ERROR') { - setConnectionState('error'); - } else if (status === 'TIMED_OUT') { - setConnectionState('disconnected'); - } else if (status === 'CLOSED') { - setConnectionState('disconnected'); - } - }); + }) + .subscribe((status) => { + console.log('[Realtime:content-submissions] Subscription status:', status); + + if (status === 'SUBSCRIBED') { + setConnectionState('connected'); + } else if (status === 'CHANNEL_ERROR') { + setConnectionState('error'); + } else if (status === 'TIMED_OUT') { + setConnectionState('disconnected'); + } else if (status === 'CLOSED') { + setConnectionState('disconnected'); + } + }); - setChannel(newChannel); + setChannel(newChannel); + }; + + setupChannel(); return () => { - console.log('[Realtime:content-submissions-changes] Cleaning up channel'); - supabase.removeChannel(newChannel); + if (channel) { + console.log('[Realtime:content-submissions] Cleaning up channel'); + supabase.removeChannel(channel); + } }; }, [realtimeEnabled]); diff --git a/supabase/migrations/20251003180644_fc7eb058-5e16-4f02-add0-3f8081573812.sql b/supabase/migrations/20251003180644_fc7eb058-5e16-4f02-add0-3f8081573812.sql new file mode 100644 index 00000000..6d548298 --- /dev/null +++ b/supabase/migrations/20251003180644_fc7eb058-5e16-4f02-add0-3f8081573812.sql @@ -0,0 +1,80 @@ +-- Fix search_path for broadcast trigger functions +CREATE OR REPLACE FUNCTION public.broadcast_content_submission_changes() +RETURNS trigger +SECURITY DEFINER +SET search_path = public +LANGUAGE plpgsql +AS $$ +BEGIN + PERFORM realtime.broadcast_changes( + 'moderation:content_submissions', + TG_OP, + TG_OP, + TG_TABLE_NAME, + TG_TABLE_SCHEMA, + NEW, + OLD + ); + RETURN NULL; +END; +$$; + +CREATE OR REPLACE FUNCTION public.broadcast_submission_item_changes() +RETURNS trigger +SECURITY DEFINER +SET search_path = public +LANGUAGE plpgsql +AS $$ +BEGIN + PERFORM realtime.broadcast_changes( + 'moderation:submission_items', + TG_OP, + TG_OP, + TG_TABLE_NAME, + TG_TABLE_SCHEMA, + NEW, + OLD + ); + RETURN NULL; +END; +$$; + +CREATE OR REPLACE FUNCTION public.broadcast_report_changes() +RETURNS trigger +SECURITY DEFINER +SET search_path = public +LANGUAGE plpgsql +AS $$ +BEGIN + PERFORM realtime.broadcast_changes( + 'moderation:reports', + TG_OP, + TG_OP, + TG_TABLE_NAME, + TG_TABLE_SCHEMA, + NEW, + OLD + ); + RETURN NULL; +END; +$$; + +CREATE OR REPLACE FUNCTION public.broadcast_review_changes() +RETURNS trigger +SECURITY DEFINER +SET search_path = public +LANGUAGE plpgsql +AS $$ +BEGIN + PERFORM realtime.broadcast_changes( + 'moderation:reviews', + TG_OP, + TG_OP, + TG_TABLE_NAME, + TG_TABLE_SCHEMA, + NEW, + OLD + ); + RETURN NULL; +END; +$$; \ No newline at end of file