Refactor: Simplify Realtime channel creation

This commit is contained in:
gpt-engineer-app[bot]
2025-10-03 17:25:10 +00:00
parent c42f523cc4
commit 8e6c8728a3
5 changed files with 150 additions and 253 deletions

View File

@@ -1,7 +1,8 @@
import { RefreshCw, Wifi, WifiOff, AlertCircle } from 'lucide-react'; import { RefreshCw, Wifi, WifiOff, AlertCircle } from 'lucide-react';
import { Button } from '@/components/ui/button'; import { Button } from '@/components/ui/button';
import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip'; import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip';
import { ConnectionState } from '@/hooks/useEnhancedRealtime';
type ConnectionState = 'connecting' | 'connected' | 'disconnected' | 'error';
interface RealtimeConnectionStatusProps { interface RealtimeConnectionStatusProps {
connectionState: ConnectionState; connectionState: ConnectionState;

View File

@@ -1,195 +0,0 @@
import { useEffect, useState, useRef, useCallback } from 'react';
import { supabase } from '@/integrations/supabase/client';
import { RealtimeChannel } from '@supabase/supabase-js';
import { toast } from '@/hooks/use-toast';
export type ConnectionState = 'connecting' | 'connected' | 'disconnected' | 'error';
interface UseEnhancedRealtimeOptions {
enabled?: boolean;
channelName: string;
debug?: boolean;
retryAttempts?: number;
retryDelay?: number;
maxRetryDelay?: number;
onConnectionChange?: (state: ConnectionState) => void;
}
interface UseEnhancedRealtimeReturn {
channel: RealtimeChannel | null;
connectionState: ConnectionState;
reconnect: () => void;
disconnect: () => void;
}
const activeChannels = new Map<string, RealtimeChannel>();
export const useEnhancedRealtime = (
options: UseEnhancedRealtimeOptions
): UseEnhancedRealtimeReturn => {
const {
enabled = true,
channelName,
debug = false,
retryAttempts = 5,
retryDelay = 1000,
maxRetryDelay = 30000,
onConnectionChange,
} = options;
const [channel, setChannel] = useState<RealtimeChannel | null>(null);
const [connectionState, setConnectionState] = useState<ConnectionState>('disconnected');
const retryCountRef = useRef(0);
const retryTimeoutRef = useRef<NodeJS.Timeout | null>(null);
const onConnectionChangeRef = useRef(onConnectionChange);
// Update callback ref
useEffect(() => {
onConnectionChangeRef.current = onConnectionChange;
}, [onConnectionChange]);
const log = useCallback((...args: any[]) => {
if (debug) {
console.log(`[Realtime:${channelName}]`, ...args);
}
}, [debug, channelName]);
const updateConnectionState = useCallback((state: ConnectionState) => {
setConnectionState(state);
onConnectionChangeRef.current?.(state);
log('Connection state:', state);
}, [log]);
const calculateRetryDelay = useCallback((attempt: number): number => {
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s (max)
const delay = Math.min(retryDelay * Math.pow(2, attempt), maxRetryDelay);
return delay;
}, [retryDelay, maxRetryDelay]);
const cleanup = useCallback(() => {
if (retryTimeoutRef.current) {
clearTimeout(retryTimeoutRef.current);
retryTimeoutRef.current = null;
}
const existingChannel = activeChannels.get(channelName);
if (existingChannel) {
log('Cleaning up existing channel');
supabase.removeChannel(existingChannel);
activeChannels.delete(channelName);
}
setChannel(null);
}, [channelName, log]);
const connect = useCallback(() => {
if (!enabled) {
log('Realtime disabled');
return;
}
// Check if channel already exists
if (activeChannels.has(channelName)) {
log('Channel already exists, reusing');
const existingChannel = activeChannels.get(channelName)!;
setChannel(existingChannel);
updateConnectionState('connected');
return;
}
log('Creating new channel');
updateConnectionState('connecting');
const newChannel = supabase.channel(channelName);
// Store channel immediately to prevent duplicates
activeChannels.set(channelName, newChannel);
setChannel(newChannel);
// Subscribe with status monitoring
newChannel.subscribe((status) => {
log('Subscription status:', status);
if (status === 'SUBSCRIBED') {
updateConnectionState('connected');
retryCountRef.current = 0; // Reset retry count on successful connection
if (retryCountRef.current > 0) {
toast({
title: "Connection Restored",
description: "Live updates are now active",
});
}
} else if (status === 'CHANNEL_ERROR') {
updateConnectionState('error');
log('Channel error, attempting retry');
// Attempt reconnection
if (retryCountRef.current < retryAttempts) {
const delay = calculateRetryDelay(retryCountRef.current);
log(`Retrying in ${delay}ms (attempt ${retryCountRef.current + 1}/${retryAttempts})`);
retryTimeoutRef.current = setTimeout(() => {
retryCountRef.current++;
cleanup();
connect();
}, delay);
} else {
log('Max retry attempts reached');
toast({
title: "Connection Failed",
description: "Unable to establish live updates. Please refresh the page.",
variant: "destructive",
});
}
} else if (status === 'TIMED_OUT') {
updateConnectionState('disconnected');
log('Connection timed out');
// Attempt reconnection
if (retryCountRef.current < retryAttempts) {
retryCountRef.current++;
cleanup();
connect();
}
} else if (status === 'CLOSED') {
updateConnectionState('disconnected');
log('Connection closed');
}
});
}, [enabled, channelName, log, updateConnectionState, retryAttempts, calculateRetryDelay, cleanup]);
const reconnect = useCallback(() => {
log('Manual reconnect triggered');
retryCountRef.current = 0;
cleanup();
connect();
}, [log, cleanup, connect]);
const disconnect = useCallback(() => {
log('Manual disconnect triggered');
cleanup();
updateConnectionState('disconnected');
}, [log, cleanup, updateConnectionState]);
// Connect on mount or when enabled changes
useEffect(() => {
if (enabled) {
connect();
} else {
cleanup();
updateConnectionState('disconnected');
}
return () => {
cleanup();
};
}, [enabled]); // Only depend on enabled, not connect/cleanup to avoid loops
return {
channel,
connectionState,
reconnect,
disconnect,
};
};

View File

@@ -1,8 +1,10 @@
import { useEffect, useState, useRef, useCallback } from 'react'; import { useEffect, useState, useRef, useCallback } from 'react';
import { supabase } from '@/integrations/supabase/client'; import { supabase } from '@/integrations/supabase/client';
import { useEnhancedRealtime, ConnectionState } from './useEnhancedRealtime'; import { RealtimeChannel } from '@supabase/supabase-js';
import { useUserRole } from './useUserRole'; import { useUserRole } from './useUserRole';
type ConnectionState = 'connecting' | 'connected' | 'disconnected' | 'error';
interface ModerationStats { interface ModerationStats {
pendingSubmissions: number; pendingSubmissions: number;
openReports: number; openReports: number;
@@ -23,6 +25,8 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp
openReports: 0, openReports: 0,
flaggedContent: 0, flaggedContent: 0,
}); });
const [channel, setChannel] = useState<RealtimeChannel | null>(null);
const [connectionState, setConnectionState] = useState<ConnectionState>('disconnected');
const updateTimerRef = useRef<NodeJS.Timeout | null>(null); const updateTimerRef = useRef<NodeJS.Timeout | null>(null);
const onStatsChangeRef = useRef(onStatsChange); const onStatsChangeRef = useRef(onStatsChange);
@@ -71,29 +75,23 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp
updateTimerRef.current = setTimeout(fetchStats, debounceMs); updateTimerRef.current = setTimeout(fetchStats, debounceMs);
}, [fetchStats, debounceMs]); }, [fetchStats, debounceMs]);
const { channel, connectionState, reconnect } = useEnhancedRealtime({ const reconnect = useCallback(() => {
enabled: realtimeEnabled, if (channel) {
channelName: 'moderation-stats-changes', supabase.removeChannel(channel);
debug: true, }
onConnectionChange: (state: ConnectionState) => { setConnectionState('connecting');
// Fallback to polling when disconnected fetchStats();
if (state === 'disconnected' || state === 'error') { }, [channel, fetchStats]);
console.log('Realtime disconnected, falling back to polling');
// Could implement polling here if needed
}
},
});
// Initial fetch and polling fallback
useEffect(() => { useEffect(() => {
if (!enabled) return; if (!enabled) return;
// Initial fetch
fetchStats(); fetchStats();
// Set up polling interval as fallback (only when connected state is not 'connected')
let pollInterval: NodeJS.Timeout | null = null; let pollInterval: NodeJS.Timeout | null = null;
if (connectionState !== 'connected') { if (connectionState !== 'connected') {
pollInterval = setInterval(fetchStats, 30000); // Poll every 30 seconds pollInterval = setInterval(fetchStats, 30000);
} }
return () => { return () => {
@@ -106,10 +104,18 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp
}; };
}, [enabled, fetchStats, connectionState]); }, [enabled, fetchStats, connectionState]);
// Set up realtime connection with all listeners configured before subscribing
useEffect(() => { useEffect(() => {
if (!channel) return; if (!realtimeEnabled) {
console.log('[Realtime:moderation-stats-changes] Realtime disabled');
return;
}
channel console.log('[Realtime:moderation-stats-changes] Creating new channel');
setConnectionState('connecting');
const newChannel = supabase
.channel('moderation-stats-changes')
.on( .on(
'postgres_changes', 'postgres_changes',
{ {
@@ -119,14 +125,12 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp
}, },
(payload) => { (payload) => {
console.log('Content submission inserted'); console.log('Content submission inserted');
// Optimistic update: increment pending submissions
if (payload.new.status === 'pending') { if (payload.new.status === 'pending') {
setStats(prev => ({ setStats(prev => ({
...prev, ...prev,
pendingSubmissions: prev.pendingSubmissions + 1 pendingSubmissions: prev.pendingSubmissions + 1
})); }));
} }
// Debounced sync as backup
debouncedFetchStats(); debouncedFetchStats();
} }
) )
@@ -139,7 +143,6 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp
}, },
(payload) => { (payload) => {
console.log('Content submission updated'); console.log('Content submission updated');
// Optimistic update: adjust counter based on status change
const oldStatus = payload.old.status; const oldStatus = payload.old.status;
const newStatus = payload.new.status; const newStatus = payload.new.status;
@@ -167,7 +170,6 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp
}, },
(payload) => { (payload) => {
console.log('Content submission deleted'); console.log('Content submission deleted');
// Optimistic update: decrement if was pending
if (payload.old.status === 'pending') { if (payload.old.status === 'pending') {
setStats(prev => ({ setStats(prev => ({
...prev, ...prev,
@@ -298,8 +300,28 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp
} }
debouncedFetchStats(); debouncedFetchStats();
} }
); )
}, [channel, debouncedFetchStats]); .subscribe((status) => {
console.log('[Realtime:moderation-stats-changes] Subscription status:', status);
if (status === 'SUBSCRIBED') {
setConnectionState('connected');
} else if (status === 'CHANNEL_ERROR') {
setConnectionState('error');
} else if (status === 'TIMED_OUT') {
setConnectionState('disconnected');
} else if (status === 'CLOSED') {
setConnectionState('disconnected');
}
});
setChannel(newChannel);
return () => {
console.log('[Realtime:moderation-stats-changes] Cleaning up channel');
supabase.removeChannel(newChannel);
};
}, [realtimeEnabled, debouncedFetchStats]);
return { stats, refresh: fetchStats, connectionState, reconnect }; return { stats, refresh: fetchStats, connectionState, reconnect };
}; };

View File

@@ -1,5 +1,8 @@
import { useEffect, useRef } from 'react'; import { useEffect, useRef, useState, useCallback } from 'react';
import { useEnhancedRealtime, ConnectionState } from './useEnhancedRealtime'; import { supabase } from '@/integrations/supabase/client';
import { RealtimeChannel } from '@supabase/supabase-js';
type ConnectionState = 'connecting' | 'connected' | 'disconnected' | 'error';
interface UseRealtimeSubmissionItemsOptions { interface UseRealtimeSubmissionItemsOptions {
submissionId?: string; submissionId?: string;
@@ -10,6 +13,9 @@ interface UseRealtimeSubmissionItemsOptions {
export const useRealtimeSubmissionItems = (options: UseRealtimeSubmissionItemsOptions = {}) => { export const useRealtimeSubmissionItems = (options: UseRealtimeSubmissionItemsOptions = {}) => {
const { submissionId, onUpdate, enabled = true } = options; const { submissionId, onUpdate, enabled = true } = options;
const [channel, setChannel] = useState<RealtimeChannel | null>(null);
const [connectionState, setConnectionState] = useState<ConnectionState>('disconnected');
// Use ref to store latest callback without triggering re-subscriptions // Use ref to store latest callback without triggering re-subscriptions
const onUpdateRef = useRef(onUpdate); const onUpdateRef = useRef(onUpdate);
@@ -18,29 +24,58 @@ export const useRealtimeSubmissionItems = (options: UseRealtimeSubmissionItemsOp
onUpdateRef.current = onUpdate; onUpdateRef.current = onUpdate;
}, [onUpdate]); }, [onUpdate]);
const { channel, connectionState, reconnect } = useEnhancedRealtime({ const reconnect = useCallback(() => {
enabled: enabled && !!submissionId, if (channel) {
channelName: `submission-items-${submissionId || 'none'}`, supabase.removeChannel(channel);
debug: true, }
}); setConnectionState('connecting');
}, [channel]);
useEffect(() => { useEffect(() => {
if (!channel || !submissionId) return; if (!enabled || !submissionId) {
console.log('[Realtime:submission-items] Disabled or no submission ID');
return;
}
channel.on( console.log('[Realtime:submission-items] Creating new channel for submission:', submissionId);
'postgres_changes', setConnectionState('connecting');
{
event: 'UPDATE', const newChannel = supabase
schema: 'public', .channel(`submission-items-${submissionId}`)
table: 'submission_items', .on(
filter: `submission_id=eq.${submissionId}`, 'postgres_changes',
}, {
(payload) => { event: 'UPDATE',
console.log('Submission item updated:', payload); schema: 'public',
onUpdateRef.current?.(payload); table: 'submission_items',
} filter: `submission_id=eq.${submissionId}`,
); },
}, [channel, submissionId]); (payload) => {
console.log('Submission item updated:', payload);
onUpdateRef.current?.(payload);
}
)
.subscribe((status) => {
console.log(`[Realtime:submission-items-${submissionId}] Subscription status:`, status);
if (status === 'SUBSCRIBED') {
setConnectionState('connected');
} else if (status === 'CHANNEL_ERROR') {
setConnectionState('error');
} else if (status === 'TIMED_OUT') {
setConnectionState('disconnected');
} else if (status === 'CLOSED') {
setConnectionState('disconnected');
}
});
setChannel(newChannel);
return () => {
console.log('[Realtime:submission-items] Cleaning up channel');
supabase.removeChannel(newChannel);
};
}, [enabled, submissionId]);
return { channel, connectionState, reconnect }; return { channel, connectionState, reconnect };
}; };

View File

@@ -1,7 +1,10 @@
import { useEffect, useRef } from 'react'; import { useEffect, useRef, useState, useCallback } from 'react';
import { useEnhancedRealtime, ConnectionState } from './useEnhancedRealtime'; import { supabase } from '@/integrations/supabase/client';
import { RealtimeChannel } from '@supabase/supabase-js';
import { useUserRole } from './useUserRole'; import { useUserRole } from './useUserRole';
type ConnectionState = 'connecting' | 'connected' | 'disconnected' | 'error';
interface UseRealtimeSubmissionsOptions { interface UseRealtimeSubmissionsOptions {
onInsert?: (payload: any) => void; onInsert?: (payload: any) => void;
onUpdate?: (payload: any) => void; onUpdate?: (payload: any) => void;
@@ -13,6 +16,9 @@ export const useRealtimeSubmissions = (options: UseRealtimeSubmissionsOptions =
const { onInsert, onUpdate, onDelete, enabled = true } = options; const { onInsert, onUpdate, onDelete, enabled = true } = options;
const { isModerator, loading: roleLoading } = useUserRole(); const { isModerator, loading: roleLoading } = useUserRole();
const [channel, setChannel] = useState<RealtimeChannel | null>(null);
const [connectionState, setConnectionState] = useState<ConnectionState>('disconnected');
// Only enable realtime when user is confirmed as moderator // Only enable realtime when user is confirmed as moderator
const realtimeEnabled = enabled && !roleLoading && isModerator(); const realtimeEnabled = enabled && !roleLoading && isModerator();
@@ -28,16 +34,24 @@ export const useRealtimeSubmissions = (options: UseRealtimeSubmissionsOptions =
onDeleteRef.current = onDelete; onDeleteRef.current = onDelete;
}, [onInsert, onUpdate, onDelete]); }, [onInsert, onUpdate, onDelete]);
const { channel, connectionState, reconnect } = useEnhancedRealtime({ const reconnect = useCallback(() => {
enabled: realtimeEnabled, if (channel) {
channelName: 'content-submissions-changes', supabase.removeChannel(channel);
debug: true, }
}); setConnectionState('connecting');
}, [channel]);
useEffect(() => { useEffect(() => {
if (!channel) return; if (!realtimeEnabled) {
console.log('[Realtime:content-submissions-changes] Realtime disabled');
return;
}
channel console.log('[Realtime:content-submissions-changes] Creating new channel');
setConnectionState('connecting');
const newChannel = supabase
.channel('content-submissions-changes')
.on( .on(
'postgres_changes', 'postgres_changes',
{ {
@@ -73,8 +87,28 @@ export const useRealtimeSubmissions = (options: UseRealtimeSubmissionsOptions =
console.log('Submission deleted:', payload); console.log('Submission deleted:', payload);
onDeleteRef.current?.(payload); onDeleteRef.current?.(payload);
} }
); )
}, [channel]); .subscribe((status) => {
console.log('[Realtime:content-submissions-changes] Subscription status:', status);
if (status === 'SUBSCRIBED') {
setConnectionState('connected');
} else if (status === 'CHANNEL_ERROR') {
setConnectionState('error');
} else if (status === 'TIMED_OUT') {
setConnectionState('disconnected');
} else if (status === 'CLOSED') {
setConnectionState('disconnected');
}
});
setChannel(newChannel);
return () => {
console.log('[Realtime:content-submissions-changes] Cleaning up channel');
supabase.removeChannel(newChannel);
};
}, [realtimeEnabled]);
return { channel, connectionState, reconnect }; return { channel, connectionState, reconnect };
}; };