From 15caad0c8e6f9d8a7a299fe33276804f52baa938 Mon Sep 17 00:00:00 2001 From: "gpt-engineer-app[bot]" <159125892+gpt-engineer-app[bot]@users.noreply.github.com> Date: Thu, 2 Oct 2025 20:38:30 +0000 Subject: [PATCH] feat: Implement enhanced realtime subscriptions --- src/components/moderation/ModerationQueue.tsx | 10 +- .../moderation/RealtimeConnectionStatus.tsx | 99 +++++++++ src/hooks/useEnhancedRealtime.ts | 195 ++++++++++++++++++ src/hooks/useRealtimeModerationStats.ts | 58 ++++-- src/hooks/useRealtimeSubmissionItems.ts | 56 +++-- src/hooks/useRealtimeSubmissions.ts | 35 ++-- 6 files changed, 378 insertions(+), 75 deletions(-) create mode 100644 src/components/moderation/RealtimeConnectionStatus.tsx create mode 100644 src/hooks/useEnhancedRealtime.ts diff --git a/src/components/moderation/ModerationQueue.tsx b/src/components/moderation/ModerationQueue.tsx index 49060726..55e00b09 100644 --- a/src/components/moderation/ModerationQueue.tsx +++ b/src/components/moderation/ModerationQueue.tsx @@ -17,6 +17,7 @@ import { SubmissionReviewManager } from './SubmissionReviewManager'; import { useRealtimeSubmissions } from '@/hooks/useRealtimeSubmissions'; import { useIsMobile } from '@/hooks/use-mobile'; import { EntityEditPreview } from './EntityEditPreview'; +import { RealtimeConnectionStatus } from './RealtimeConnectionStatus'; interface ModerationItem { id: string; @@ -348,7 +349,7 @@ export const ModerationQueue = forwardRef((props, ref) => { }; // Set up realtime subscriptions - useRealtimeSubmissions({ + const { connectionState: submissionsConnectionState, reconnect: reconnectSubmissions } = useRealtimeSubmissions({ onInsert: (payload) => { console.log('New submission received'); toast({ @@ -1736,6 +1737,13 @@ export const ModerationQueue = forwardRef((props, ref) => {
{/* Filter Bar */}
+
+

Moderation Queue

+ +
diff --git a/src/components/moderation/RealtimeConnectionStatus.tsx b/src/components/moderation/RealtimeConnectionStatus.tsx new file mode 100644 index 00000000..48a583c7 --- /dev/null +++ b/src/components/moderation/RealtimeConnectionStatus.tsx @@ -0,0 +1,99 @@ +import { RefreshCw, Wifi, WifiOff, AlertCircle } from 'lucide-react'; +import { Button } from '@/components/ui/button'; +import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip'; +import { ConnectionState } from '@/hooks/useEnhancedRealtime'; + +interface RealtimeConnectionStatusProps { + connectionState: ConnectionState; + onReconnect: () => void; + className?: string; +} + +export function RealtimeConnectionStatus({ + connectionState, + onReconnect, + className = '', +}: RealtimeConnectionStatusProps) { + const getStatusConfig = () => { + switch (connectionState) { + case 'connected': + return { + icon: Wifi, + color: 'text-green-500', + label: 'Connected', + description: 'Live updates active', + showReconnect: false, + }; + case 'connecting': + return { + icon: RefreshCw, + color: 'text-yellow-500', + label: 'Connecting', + description: 'Establishing connection...', + showReconnect: false, + animate: 'animate-spin', + }; + case 'error': + return { + icon: AlertCircle, + color: 'text-red-500', + label: 'Error', + description: 'Connection failed. Retrying...', + showReconnect: true, + }; + case 'disconnected': + return { + icon: WifiOff, + color: 'text-muted-foreground', + label: 'Disconnected', + description: 'Live updates unavailable', + showReconnect: true, + }; + default: + return { + icon: WifiOff, + color: 'text-muted-foreground', + label: 'Unknown', + description: 'Connection status unknown', + showReconnect: true, + }; + } + }; + + const config = getStatusConfig(); + const Icon = config.icon; + + return ( +
+ + + +
+ + + {config.label} + +
+
+ +

{config.description}

+
+
+
+ + {config.showReconnect && ( + + )} +
+ ); +} diff --git a/src/hooks/useEnhancedRealtime.ts b/src/hooks/useEnhancedRealtime.ts new file mode 100644 index 00000000..b2d141b2 --- /dev/null +++ b/src/hooks/useEnhancedRealtime.ts @@ -0,0 +1,195 @@ +import { useEffect, useState, useRef, useCallback } from 'react'; +import { supabase } from '@/integrations/supabase/client'; +import { RealtimeChannel } from '@supabase/supabase-js'; +import { toast } from '@/hooks/use-toast'; + +export type ConnectionState = 'connecting' | 'connected' | 'disconnected' | 'error'; + +interface UseEnhancedRealtimeOptions { + enabled?: boolean; + channelName: string; + debug?: boolean; + retryAttempts?: number; + retryDelay?: number; + maxRetryDelay?: number; + onConnectionChange?: (state: ConnectionState) => void; +} + +interface UseEnhancedRealtimeReturn { + channel: RealtimeChannel | null; + connectionState: ConnectionState; + reconnect: () => void; + disconnect: () => void; +} + +const activeChannels = new Map(); + +export const useEnhancedRealtime = ( + options: UseEnhancedRealtimeOptions +): UseEnhancedRealtimeReturn => { + const { + enabled = true, + channelName, + debug = false, + retryAttempts = 5, + retryDelay = 1000, + maxRetryDelay = 30000, + onConnectionChange, + } = options; + + const [channel, setChannel] = useState(null); + const [connectionState, setConnectionState] = useState('disconnected'); + const retryCountRef = useRef(0); + const retryTimeoutRef = useRef(null); + const onConnectionChangeRef = useRef(onConnectionChange); + + // Update callback ref + useEffect(() => { + onConnectionChangeRef.current = onConnectionChange; + }, [onConnectionChange]); + + const log = useCallback((...args: any[]) => { + if (debug) { + console.log(`[Realtime:${channelName}]`, ...args); + } + }, [debug, channelName]); + + const updateConnectionState = useCallback((state: ConnectionState) => { + setConnectionState(state); + onConnectionChangeRef.current?.(state); + log('Connection state:', state); + }, [log]); + + const calculateRetryDelay = useCallback((attempt: number): number => { + // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s (max) + const delay = Math.min(retryDelay * Math.pow(2, attempt), maxRetryDelay); + return delay; + }, [retryDelay, maxRetryDelay]); + + const cleanup = useCallback(() => { + if (retryTimeoutRef.current) { + clearTimeout(retryTimeoutRef.current); + retryTimeoutRef.current = null; + } + + const existingChannel = activeChannels.get(channelName); + if (existingChannel) { + log('Cleaning up existing channel'); + supabase.removeChannel(existingChannel); + activeChannels.delete(channelName); + } + + setChannel(null); + }, [channelName, log]); + + const connect = useCallback(() => { + if (!enabled) { + log('Realtime disabled'); + return; + } + + // Check if channel already exists + if (activeChannels.has(channelName)) { + log('Channel already exists, reusing'); + const existingChannel = activeChannels.get(channelName)!; + setChannel(existingChannel); + updateConnectionState('connected'); + return; + } + + log('Creating new channel'); + updateConnectionState('connecting'); + + const newChannel = supabase.channel(channelName); + + // Store channel immediately to prevent duplicates + activeChannels.set(channelName, newChannel); + setChannel(newChannel); + + // Subscribe with status monitoring + newChannel.subscribe((status) => { + log('Subscription status:', status); + + if (status === 'SUBSCRIBED') { + updateConnectionState('connected'); + retryCountRef.current = 0; // Reset retry count on successful connection + + if (retryCountRef.current > 0) { + toast({ + title: "Connection Restored", + description: "Live updates are now active", + }); + } + } else if (status === 'CHANNEL_ERROR') { + updateConnectionState('error'); + log('Channel error, attempting retry'); + + // Attempt reconnection + if (retryCountRef.current < retryAttempts) { + const delay = calculateRetryDelay(retryCountRef.current); + log(`Retrying in ${delay}ms (attempt ${retryCountRef.current + 1}/${retryAttempts})`); + + retryTimeoutRef.current = setTimeout(() => { + retryCountRef.current++; + cleanup(); + connect(); + }, delay); + } else { + log('Max retry attempts reached'); + toast({ + title: "Connection Failed", + description: "Unable to establish live updates. Please refresh the page.", + variant: "destructive", + }); + } + } else if (status === 'TIMED_OUT') { + updateConnectionState('disconnected'); + log('Connection timed out'); + + // Attempt reconnection + if (retryCountRef.current < retryAttempts) { + retryCountRef.current++; + cleanup(); + connect(); + } + } else if (status === 'CLOSED') { + updateConnectionState('disconnected'); + log('Connection closed'); + } + }); + }, [enabled, channelName, log, updateConnectionState, retryAttempts, calculateRetryDelay, cleanup]); + + const reconnect = useCallback(() => { + log('Manual reconnect triggered'); + retryCountRef.current = 0; + cleanup(); + connect(); + }, [log, cleanup, connect]); + + const disconnect = useCallback(() => { + log('Manual disconnect triggered'); + cleanup(); + updateConnectionState('disconnected'); + }, [log, cleanup, updateConnectionState]); + + // Connect on mount or when enabled changes + useEffect(() => { + if (enabled) { + connect(); + } else { + cleanup(); + updateConnectionState('disconnected'); + } + + return () => { + cleanup(); + }; + }, [enabled]); // Only depend on enabled, not connect/cleanup to avoid loops + + return { + channel, + connectionState, + reconnect, + disconnect, + }; +}; diff --git a/src/hooks/useRealtimeModerationStats.ts b/src/hooks/useRealtimeModerationStats.ts index e20e1fb8..4a0b8f29 100644 --- a/src/hooks/useRealtimeModerationStats.ts +++ b/src/hooks/useRealtimeModerationStats.ts @@ -1,6 +1,6 @@ import { useEffect, useState, useRef, useCallback } from 'react'; import { supabase } from '@/integrations/supabase/client'; -import { RealtimeChannel } from '@supabase/supabase-js'; +import { useEnhancedRealtime, ConnectionState } from './useEnhancedRealtime'; interface ModerationStats { pendingSubmissions: number; @@ -21,7 +21,6 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp openReports: 0, flaggedContent: 0, }); - const [channel, setChannel] = useState(null); const updateTimerRef = useRef(null); const onStatsChangeRef = useRef(onStatsChange); @@ -67,15 +66,45 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp updateTimerRef.current = setTimeout(fetchStats, debounceMs); }, [fetchStats, debounceMs]); + const { channel, connectionState, reconnect } = useEnhancedRealtime({ + enabled, + channelName: 'moderation-stats-changes', + debug: true, + onConnectionChange: (state: ConnectionState) => { + // Fallback to polling when disconnected + if (state === 'disconnected' || state === 'error') { + console.log('Realtime disconnected, falling back to polling'); + // Could implement polling here if needed + } + }, + }); + useEffect(() => { if (!enabled) return; // Initial fetch fetchStats(); - // Set up realtime subscriptions - const realtimeChannel = supabase - .channel('moderation-stats-changes') + // Set up polling interval as fallback (only when connected state is not 'connected') + let pollInterval: NodeJS.Timeout | null = null; + if (connectionState !== 'connected') { + pollInterval = setInterval(fetchStats, 30000); // Poll every 30 seconds + } + + return () => { + if (pollInterval) { + clearInterval(pollInterval); + } + if (updateTimerRef.current) { + clearTimeout(updateTimerRef.current); + } + }; + }, [enabled, fetchStats, connectionState]); + + useEffect(() => { + if (!channel) return; + + channel .on( 'postgres_changes', { @@ -111,21 +140,8 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp console.log('Reviews changed'); debouncedFetchStats(); } - ) - .subscribe((status) => { - console.log('Moderation stats realtime status:', status); - }); + ); + }, [channel, debouncedFetchStats]); - setChannel(realtimeChannel); - - return () => { - console.log('Cleaning up moderation stats realtime subscription'); - if (updateTimerRef.current) { - clearTimeout(updateTimerRef.current); - } - supabase.removeChannel(realtimeChannel); - }; - }, [enabled, fetchStats, debouncedFetchStats]); - - return { stats, refresh: fetchStats }; + return { stats, refresh: fetchStats, connectionState, reconnect }; }; diff --git a/src/hooks/useRealtimeSubmissionItems.ts b/src/hooks/useRealtimeSubmissionItems.ts index 913a154e..d2ec1d69 100644 --- a/src/hooks/useRealtimeSubmissionItems.ts +++ b/src/hooks/useRealtimeSubmissionItems.ts @@ -1,6 +1,5 @@ -import { useEffect, useState, useRef } from 'react'; -import { supabase } from '@/integrations/supabase/client'; -import { RealtimeChannel } from '@supabase/supabase-js'; +import { useEffect, useRef } from 'react'; +import { useEnhancedRealtime, ConnectionState } from './useEnhancedRealtime'; interface UseRealtimeSubmissionItemsOptions { submissionId?: string; @@ -10,7 +9,6 @@ interface UseRealtimeSubmissionItemsOptions { export const useRealtimeSubmissionItems = (options: UseRealtimeSubmissionItemsOptions = {}) => { const { submissionId, onUpdate, enabled = true } = options; - const [channel, setChannel] = useState(null); // Use ref to store latest callback without triggering re-subscriptions const onUpdateRef = useRef(onUpdate); @@ -20,35 +18,29 @@ export const useRealtimeSubmissionItems = (options: UseRealtimeSubmissionItemsOp onUpdateRef.current = onUpdate; }, [onUpdate]); + const { channel, connectionState, reconnect } = useEnhancedRealtime({ + enabled: enabled && !!submissionId, + channelName: `submission-items-${submissionId || 'none'}`, + debug: true, + }); + useEffect(() => { - if (!enabled || !submissionId) return; + if (!channel || !submissionId) return; - const realtimeChannel = supabase - .channel(`submission-items-${submissionId}`) - .on( - 'postgres_changes', - { - event: 'UPDATE', - schema: 'public', - table: 'submission_items', - filter: `submission_id=eq.${submissionId}`, - }, - (payload) => { - console.log('Submission item updated:', payload); - onUpdateRef.current?.(payload); - } - ) - .subscribe((status) => { - console.log('Submission items realtime status:', status); - }); + channel.on( + 'postgres_changes', + { + event: 'UPDATE', + schema: 'public', + table: 'submission_items', + filter: `submission_id=eq.${submissionId}`, + }, + (payload) => { + console.log('Submission item updated:', payload); + onUpdateRef.current?.(payload); + } + ); + }, [channel, submissionId]); - setChannel(realtimeChannel); - - return () => { - console.log('Cleaning up submission items realtime subscription'); - supabase.removeChannel(realtimeChannel); - }; - }, [submissionId, enabled]); - - return { channel }; + return { channel, connectionState, reconnect }; }; diff --git a/src/hooks/useRealtimeSubmissions.ts b/src/hooks/useRealtimeSubmissions.ts index cdb16c24..f22d1055 100644 --- a/src/hooks/useRealtimeSubmissions.ts +++ b/src/hooks/useRealtimeSubmissions.ts @@ -1,6 +1,5 @@ -import { useEffect, useState, useRef } from 'react'; -import { supabase } from '@/integrations/supabase/client'; -import { RealtimeChannel } from '@supabase/supabase-js'; +import { useEffect, useRef } from 'react'; +import { useEnhancedRealtime, ConnectionState } from './useEnhancedRealtime'; interface UseRealtimeSubmissionsOptions { onInsert?: (payload: any) => void; @@ -11,7 +10,6 @@ interface UseRealtimeSubmissionsOptions { export const useRealtimeSubmissions = (options: UseRealtimeSubmissionsOptions = {}) => { const { onInsert, onUpdate, onDelete, enabled = true } = options; - const [channel, setChannel] = useState(null); // Use refs to store latest callbacks without triggering re-subscriptions const onInsertRef = useRef(onInsert); @@ -25,11 +23,16 @@ export const useRealtimeSubmissions = (options: UseRealtimeSubmissionsOptions = onDeleteRef.current = onDelete; }, [onInsert, onUpdate, onDelete]); - useEffect(() => { - if (!enabled) return; + const { channel, connectionState, reconnect } = useEnhancedRealtime({ + enabled, + channelName: 'content-submissions-changes', + debug: true, + }); - const realtimeChannel = supabase - .channel('content-submissions-changes') + useEffect(() => { + if (!channel) return; + + channel .on( 'postgres_changes', { @@ -65,18 +68,8 @@ export const useRealtimeSubmissions = (options: UseRealtimeSubmissionsOptions = console.log('Submission deleted:', payload); onDeleteRef.current?.(payload); } - ) - .subscribe((status) => { - console.log('Submissions realtime status:', status); - }); + ); + }, [channel]); - setChannel(realtimeChannel); - - return () => { - console.log('Cleaning up submissions realtime subscription'); - supabase.removeChannel(realtimeChannel); - }; - }, [enabled]); - - return { channel }; + return { channel, connectionState, reconnect }; };