mirror of
https://github.com/pacnpal/thrilltrack-explorer.git
synced 2025-12-22 01:11:13 -05:00
feat: Implement enhanced realtime subscriptions
This commit is contained in:
@@ -17,6 +17,7 @@ import { SubmissionReviewManager } from './SubmissionReviewManager';
|
||||
import { useRealtimeSubmissions } from '@/hooks/useRealtimeSubmissions';
|
||||
import { useIsMobile } from '@/hooks/use-mobile';
|
||||
import { EntityEditPreview } from './EntityEditPreview';
|
||||
import { RealtimeConnectionStatus } from './RealtimeConnectionStatus';
|
||||
|
||||
interface ModerationItem {
|
||||
id: string;
|
||||
@@ -348,7 +349,7 @@ export const ModerationQueue = forwardRef<ModerationQueueRef>((props, ref) => {
|
||||
};
|
||||
|
||||
// Set up realtime subscriptions
|
||||
useRealtimeSubmissions({
|
||||
const { connectionState: submissionsConnectionState, reconnect: reconnectSubmissions } = useRealtimeSubmissions({
|
||||
onInsert: (payload) => {
|
||||
console.log('New submission received');
|
||||
toast({
|
||||
@@ -1736,6 +1737,13 @@ export const ModerationQueue = forwardRef<ModerationQueueRef>((props, ref) => {
|
||||
<div className="space-y-4">
|
||||
{/* Filter Bar */}
|
||||
<div className={`flex flex-col gap-4 bg-muted/50 rounded-lg ${isMobile ? 'p-3' : 'p-4 sm:flex-row'}`}>
|
||||
<div className="flex items-center justify-between w-full mb-2 pb-2 border-b border-border">
|
||||
<h3 className="text-sm font-medium text-muted-foreground">Moderation Queue</h3>
|
||||
<RealtimeConnectionStatus
|
||||
connectionState={submissionsConnectionState}
|
||||
onReconnect={reconnectSubmissions}
|
||||
/>
|
||||
</div>
|
||||
<div className={`flex gap-4 flex-1 ${isMobile ? 'flex-col' : 'flex-col sm:flex-row'}`}>
|
||||
<div className={`space-y-2 ${isMobile ? 'w-full' : 'min-w-[140px]'}`}>
|
||||
<Label className={`font-medium ${isMobile ? 'text-xs' : 'text-sm'}`}>Entity Type</Label>
|
||||
|
||||
99
src/components/moderation/RealtimeConnectionStatus.tsx
Normal file
99
src/components/moderation/RealtimeConnectionStatus.tsx
Normal file
@@ -0,0 +1,99 @@
|
||||
import { RefreshCw, Wifi, WifiOff, AlertCircle } from 'lucide-react';
|
||||
import { Button } from '@/components/ui/button';
|
||||
import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip';
|
||||
import { ConnectionState } from '@/hooks/useEnhancedRealtime';
|
||||
|
||||
interface RealtimeConnectionStatusProps {
|
||||
connectionState: ConnectionState;
|
||||
onReconnect: () => void;
|
||||
className?: string;
|
||||
}
|
||||
|
||||
export function RealtimeConnectionStatus({
|
||||
connectionState,
|
||||
onReconnect,
|
||||
className = '',
|
||||
}: RealtimeConnectionStatusProps) {
|
||||
const getStatusConfig = () => {
|
||||
switch (connectionState) {
|
||||
case 'connected':
|
||||
return {
|
||||
icon: Wifi,
|
||||
color: 'text-green-500',
|
||||
label: 'Connected',
|
||||
description: 'Live updates active',
|
||||
showReconnect: false,
|
||||
};
|
||||
case 'connecting':
|
||||
return {
|
||||
icon: RefreshCw,
|
||||
color: 'text-yellow-500',
|
||||
label: 'Connecting',
|
||||
description: 'Establishing connection...',
|
||||
showReconnect: false,
|
||||
animate: 'animate-spin',
|
||||
};
|
||||
case 'error':
|
||||
return {
|
||||
icon: AlertCircle,
|
||||
color: 'text-red-500',
|
||||
label: 'Error',
|
||||
description: 'Connection failed. Retrying...',
|
||||
showReconnect: true,
|
||||
};
|
||||
case 'disconnected':
|
||||
return {
|
||||
icon: WifiOff,
|
||||
color: 'text-muted-foreground',
|
||||
label: 'Disconnected',
|
||||
description: 'Live updates unavailable',
|
||||
showReconnect: true,
|
||||
};
|
||||
default:
|
||||
return {
|
||||
icon: WifiOff,
|
||||
color: 'text-muted-foreground',
|
||||
label: 'Unknown',
|
||||
description: 'Connection status unknown',
|
||||
showReconnect: true,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
const config = getStatusConfig();
|
||||
const Icon = config.icon;
|
||||
|
||||
return (
|
||||
<div className={`flex items-center gap-2 ${className}`}>
|
||||
<TooltipProvider>
|
||||
<Tooltip>
|
||||
<TooltipTrigger asChild>
|
||||
<div className="flex items-center gap-2">
|
||||
<Icon
|
||||
className={`w-4 h-4 ${config.color} ${config.animate || ''}`}
|
||||
/>
|
||||
<span className="text-sm text-muted-foreground hidden sm:inline">
|
||||
{config.label}
|
||||
</span>
|
||||
</div>
|
||||
</TooltipTrigger>
|
||||
<TooltipContent>
|
||||
<p>{config.description}</p>
|
||||
</TooltipContent>
|
||||
</Tooltip>
|
||||
</TooltipProvider>
|
||||
|
||||
{config.showReconnect && (
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="sm"
|
||||
onClick={onReconnect}
|
||||
className="h-8 px-2"
|
||||
>
|
||||
<RefreshCw className="w-3 h-3 mr-1" />
|
||||
Reconnect
|
||||
</Button>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
195
src/hooks/useEnhancedRealtime.ts
Normal file
195
src/hooks/useEnhancedRealtime.ts
Normal file
@@ -0,0 +1,195 @@
|
||||
import { useEffect, useState, useRef, useCallback } from 'react';
|
||||
import { supabase } from '@/integrations/supabase/client';
|
||||
import { RealtimeChannel } from '@supabase/supabase-js';
|
||||
import { toast } from '@/hooks/use-toast';
|
||||
|
||||
export type ConnectionState = 'connecting' | 'connected' | 'disconnected' | 'error';
|
||||
|
||||
interface UseEnhancedRealtimeOptions {
|
||||
enabled?: boolean;
|
||||
channelName: string;
|
||||
debug?: boolean;
|
||||
retryAttempts?: number;
|
||||
retryDelay?: number;
|
||||
maxRetryDelay?: number;
|
||||
onConnectionChange?: (state: ConnectionState) => void;
|
||||
}
|
||||
|
||||
interface UseEnhancedRealtimeReturn {
|
||||
channel: RealtimeChannel | null;
|
||||
connectionState: ConnectionState;
|
||||
reconnect: () => void;
|
||||
disconnect: () => void;
|
||||
}
|
||||
|
||||
const activeChannels = new Map<string, RealtimeChannel>();
|
||||
|
||||
export const useEnhancedRealtime = (
|
||||
options: UseEnhancedRealtimeOptions
|
||||
): UseEnhancedRealtimeReturn => {
|
||||
const {
|
||||
enabled = true,
|
||||
channelName,
|
||||
debug = false,
|
||||
retryAttempts = 5,
|
||||
retryDelay = 1000,
|
||||
maxRetryDelay = 30000,
|
||||
onConnectionChange,
|
||||
} = options;
|
||||
|
||||
const [channel, setChannel] = useState<RealtimeChannel | null>(null);
|
||||
const [connectionState, setConnectionState] = useState<ConnectionState>('disconnected');
|
||||
const retryCountRef = useRef(0);
|
||||
const retryTimeoutRef = useRef<NodeJS.Timeout | null>(null);
|
||||
const onConnectionChangeRef = useRef(onConnectionChange);
|
||||
|
||||
// Update callback ref
|
||||
useEffect(() => {
|
||||
onConnectionChangeRef.current = onConnectionChange;
|
||||
}, [onConnectionChange]);
|
||||
|
||||
const log = useCallback((...args: any[]) => {
|
||||
if (debug) {
|
||||
console.log(`[Realtime:${channelName}]`, ...args);
|
||||
}
|
||||
}, [debug, channelName]);
|
||||
|
||||
const updateConnectionState = useCallback((state: ConnectionState) => {
|
||||
setConnectionState(state);
|
||||
onConnectionChangeRef.current?.(state);
|
||||
log('Connection state:', state);
|
||||
}, [log]);
|
||||
|
||||
const calculateRetryDelay = useCallback((attempt: number): number => {
|
||||
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s (max)
|
||||
const delay = Math.min(retryDelay * Math.pow(2, attempt), maxRetryDelay);
|
||||
return delay;
|
||||
}, [retryDelay, maxRetryDelay]);
|
||||
|
||||
const cleanup = useCallback(() => {
|
||||
if (retryTimeoutRef.current) {
|
||||
clearTimeout(retryTimeoutRef.current);
|
||||
retryTimeoutRef.current = null;
|
||||
}
|
||||
|
||||
const existingChannel = activeChannels.get(channelName);
|
||||
if (existingChannel) {
|
||||
log('Cleaning up existing channel');
|
||||
supabase.removeChannel(existingChannel);
|
||||
activeChannels.delete(channelName);
|
||||
}
|
||||
|
||||
setChannel(null);
|
||||
}, [channelName, log]);
|
||||
|
||||
const connect = useCallback(() => {
|
||||
if (!enabled) {
|
||||
log('Realtime disabled');
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if channel already exists
|
||||
if (activeChannels.has(channelName)) {
|
||||
log('Channel already exists, reusing');
|
||||
const existingChannel = activeChannels.get(channelName)!;
|
||||
setChannel(existingChannel);
|
||||
updateConnectionState('connected');
|
||||
return;
|
||||
}
|
||||
|
||||
log('Creating new channel');
|
||||
updateConnectionState('connecting');
|
||||
|
||||
const newChannel = supabase.channel(channelName);
|
||||
|
||||
// Store channel immediately to prevent duplicates
|
||||
activeChannels.set(channelName, newChannel);
|
||||
setChannel(newChannel);
|
||||
|
||||
// Subscribe with status monitoring
|
||||
newChannel.subscribe((status) => {
|
||||
log('Subscription status:', status);
|
||||
|
||||
if (status === 'SUBSCRIBED') {
|
||||
updateConnectionState('connected');
|
||||
retryCountRef.current = 0; // Reset retry count on successful connection
|
||||
|
||||
if (retryCountRef.current > 0) {
|
||||
toast({
|
||||
title: "Connection Restored",
|
||||
description: "Live updates are now active",
|
||||
});
|
||||
}
|
||||
} else if (status === 'CHANNEL_ERROR') {
|
||||
updateConnectionState('error');
|
||||
log('Channel error, attempting retry');
|
||||
|
||||
// Attempt reconnection
|
||||
if (retryCountRef.current < retryAttempts) {
|
||||
const delay = calculateRetryDelay(retryCountRef.current);
|
||||
log(`Retrying in ${delay}ms (attempt ${retryCountRef.current + 1}/${retryAttempts})`);
|
||||
|
||||
retryTimeoutRef.current = setTimeout(() => {
|
||||
retryCountRef.current++;
|
||||
cleanup();
|
||||
connect();
|
||||
}, delay);
|
||||
} else {
|
||||
log('Max retry attempts reached');
|
||||
toast({
|
||||
title: "Connection Failed",
|
||||
description: "Unable to establish live updates. Please refresh the page.",
|
||||
variant: "destructive",
|
||||
});
|
||||
}
|
||||
} else if (status === 'TIMED_OUT') {
|
||||
updateConnectionState('disconnected');
|
||||
log('Connection timed out');
|
||||
|
||||
// Attempt reconnection
|
||||
if (retryCountRef.current < retryAttempts) {
|
||||
retryCountRef.current++;
|
||||
cleanup();
|
||||
connect();
|
||||
}
|
||||
} else if (status === 'CLOSED') {
|
||||
updateConnectionState('disconnected');
|
||||
log('Connection closed');
|
||||
}
|
||||
});
|
||||
}, [enabled, channelName, log, updateConnectionState, retryAttempts, calculateRetryDelay, cleanup]);
|
||||
|
||||
const reconnect = useCallback(() => {
|
||||
log('Manual reconnect triggered');
|
||||
retryCountRef.current = 0;
|
||||
cleanup();
|
||||
connect();
|
||||
}, [log, cleanup, connect]);
|
||||
|
||||
const disconnect = useCallback(() => {
|
||||
log('Manual disconnect triggered');
|
||||
cleanup();
|
||||
updateConnectionState('disconnected');
|
||||
}, [log, cleanup, updateConnectionState]);
|
||||
|
||||
// Connect on mount or when enabled changes
|
||||
useEffect(() => {
|
||||
if (enabled) {
|
||||
connect();
|
||||
} else {
|
||||
cleanup();
|
||||
updateConnectionState('disconnected');
|
||||
}
|
||||
|
||||
return () => {
|
||||
cleanup();
|
||||
};
|
||||
}, [enabled]); // Only depend on enabled, not connect/cleanup to avoid loops
|
||||
|
||||
return {
|
||||
channel,
|
||||
connectionState,
|
||||
reconnect,
|
||||
disconnect,
|
||||
};
|
||||
};
|
||||
@@ -1,6 +1,6 @@
|
||||
import { useEffect, useState, useRef, useCallback } from 'react';
|
||||
import { supabase } from '@/integrations/supabase/client';
|
||||
import { RealtimeChannel } from '@supabase/supabase-js';
|
||||
import { useEnhancedRealtime, ConnectionState } from './useEnhancedRealtime';
|
||||
|
||||
interface ModerationStats {
|
||||
pendingSubmissions: number;
|
||||
@@ -21,7 +21,6 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp
|
||||
openReports: 0,
|
||||
flaggedContent: 0,
|
||||
});
|
||||
const [channel, setChannel] = useState<RealtimeChannel | null>(null);
|
||||
const updateTimerRef = useRef<NodeJS.Timeout | null>(null);
|
||||
const onStatsChangeRef = useRef(onStatsChange);
|
||||
|
||||
@@ -67,15 +66,45 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp
|
||||
updateTimerRef.current = setTimeout(fetchStats, debounceMs);
|
||||
}, [fetchStats, debounceMs]);
|
||||
|
||||
const { channel, connectionState, reconnect } = useEnhancedRealtime({
|
||||
enabled,
|
||||
channelName: 'moderation-stats-changes',
|
||||
debug: true,
|
||||
onConnectionChange: (state: ConnectionState) => {
|
||||
// Fallback to polling when disconnected
|
||||
if (state === 'disconnected' || state === 'error') {
|
||||
console.log('Realtime disconnected, falling back to polling');
|
||||
// Could implement polling here if needed
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
useEffect(() => {
|
||||
if (!enabled) return;
|
||||
|
||||
// Initial fetch
|
||||
fetchStats();
|
||||
|
||||
// Set up realtime subscriptions
|
||||
const realtimeChannel = supabase
|
||||
.channel('moderation-stats-changes')
|
||||
// Set up polling interval as fallback (only when connected state is not 'connected')
|
||||
let pollInterval: NodeJS.Timeout | null = null;
|
||||
if (connectionState !== 'connected') {
|
||||
pollInterval = setInterval(fetchStats, 30000); // Poll every 30 seconds
|
||||
}
|
||||
|
||||
return () => {
|
||||
if (pollInterval) {
|
||||
clearInterval(pollInterval);
|
||||
}
|
||||
if (updateTimerRef.current) {
|
||||
clearTimeout(updateTimerRef.current);
|
||||
}
|
||||
};
|
||||
}, [enabled, fetchStats, connectionState]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!channel) return;
|
||||
|
||||
channel
|
||||
.on(
|
||||
'postgres_changes',
|
||||
{
|
||||
@@ -111,21 +140,8 @@ export const useRealtimeModerationStats = (options: UseRealtimeModerationStatsOp
|
||||
console.log('Reviews changed');
|
||||
debouncedFetchStats();
|
||||
}
|
||||
)
|
||||
.subscribe((status) => {
|
||||
console.log('Moderation stats realtime status:', status);
|
||||
});
|
||||
);
|
||||
}, [channel, debouncedFetchStats]);
|
||||
|
||||
setChannel(realtimeChannel);
|
||||
|
||||
return () => {
|
||||
console.log('Cleaning up moderation stats realtime subscription');
|
||||
if (updateTimerRef.current) {
|
||||
clearTimeout(updateTimerRef.current);
|
||||
}
|
||||
supabase.removeChannel(realtimeChannel);
|
||||
};
|
||||
}, [enabled, fetchStats, debouncedFetchStats]);
|
||||
|
||||
return { stats, refresh: fetchStats };
|
||||
return { stats, refresh: fetchStats, connectionState, reconnect };
|
||||
};
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { useEffect, useState, useRef } from 'react';
|
||||
import { supabase } from '@/integrations/supabase/client';
|
||||
import { RealtimeChannel } from '@supabase/supabase-js';
|
||||
import { useEffect, useRef } from 'react';
|
||||
import { useEnhancedRealtime, ConnectionState } from './useEnhancedRealtime';
|
||||
|
||||
interface UseRealtimeSubmissionItemsOptions {
|
||||
submissionId?: string;
|
||||
@@ -10,7 +9,6 @@ interface UseRealtimeSubmissionItemsOptions {
|
||||
|
||||
export const useRealtimeSubmissionItems = (options: UseRealtimeSubmissionItemsOptions = {}) => {
|
||||
const { submissionId, onUpdate, enabled = true } = options;
|
||||
const [channel, setChannel] = useState<RealtimeChannel | null>(null);
|
||||
|
||||
// Use ref to store latest callback without triggering re-subscriptions
|
||||
const onUpdateRef = useRef(onUpdate);
|
||||
@@ -20,35 +18,29 @@ export const useRealtimeSubmissionItems = (options: UseRealtimeSubmissionItemsOp
|
||||
onUpdateRef.current = onUpdate;
|
||||
}, [onUpdate]);
|
||||
|
||||
const { channel, connectionState, reconnect } = useEnhancedRealtime({
|
||||
enabled: enabled && !!submissionId,
|
||||
channelName: `submission-items-${submissionId || 'none'}`,
|
||||
debug: true,
|
||||
});
|
||||
|
||||
useEffect(() => {
|
||||
if (!enabled || !submissionId) return;
|
||||
if (!channel || !submissionId) return;
|
||||
|
||||
const realtimeChannel = supabase
|
||||
.channel(`submission-items-${submissionId}`)
|
||||
.on(
|
||||
'postgres_changes',
|
||||
{
|
||||
event: 'UPDATE',
|
||||
schema: 'public',
|
||||
table: 'submission_items',
|
||||
filter: `submission_id=eq.${submissionId}`,
|
||||
},
|
||||
(payload) => {
|
||||
console.log('Submission item updated:', payload);
|
||||
onUpdateRef.current?.(payload);
|
||||
}
|
||||
)
|
||||
.subscribe((status) => {
|
||||
console.log('Submission items realtime status:', status);
|
||||
});
|
||||
channel.on(
|
||||
'postgres_changes',
|
||||
{
|
||||
event: 'UPDATE',
|
||||
schema: 'public',
|
||||
table: 'submission_items',
|
||||
filter: `submission_id=eq.${submissionId}`,
|
||||
},
|
||||
(payload) => {
|
||||
console.log('Submission item updated:', payload);
|
||||
onUpdateRef.current?.(payload);
|
||||
}
|
||||
);
|
||||
}, [channel, submissionId]);
|
||||
|
||||
setChannel(realtimeChannel);
|
||||
|
||||
return () => {
|
||||
console.log('Cleaning up submission items realtime subscription');
|
||||
supabase.removeChannel(realtimeChannel);
|
||||
};
|
||||
}, [submissionId, enabled]);
|
||||
|
||||
return { channel };
|
||||
return { channel, connectionState, reconnect };
|
||||
};
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { useEffect, useState, useRef } from 'react';
|
||||
import { supabase } from '@/integrations/supabase/client';
|
||||
import { RealtimeChannel } from '@supabase/supabase-js';
|
||||
import { useEffect, useRef } from 'react';
|
||||
import { useEnhancedRealtime, ConnectionState } from './useEnhancedRealtime';
|
||||
|
||||
interface UseRealtimeSubmissionsOptions {
|
||||
onInsert?: (payload: any) => void;
|
||||
@@ -11,7 +10,6 @@ interface UseRealtimeSubmissionsOptions {
|
||||
|
||||
export const useRealtimeSubmissions = (options: UseRealtimeSubmissionsOptions = {}) => {
|
||||
const { onInsert, onUpdate, onDelete, enabled = true } = options;
|
||||
const [channel, setChannel] = useState<RealtimeChannel | null>(null);
|
||||
|
||||
// Use refs to store latest callbacks without triggering re-subscriptions
|
||||
const onInsertRef = useRef(onInsert);
|
||||
@@ -25,11 +23,16 @@ export const useRealtimeSubmissions = (options: UseRealtimeSubmissionsOptions =
|
||||
onDeleteRef.current = onDelete;
|
||||
}, [onInsert, onUpdate, onDelete]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!enabled) return;
|
||||
const { channel, connectionState, reconnect } = useEnhancedRealtime({
|
||||
enabled,
|
||||
channelName: 'content-submissions-changes',
|
||||
debug: true,
|
||||
});
|
||||
|
||||
const realtimeChannel = supabase
|
||||
.channel('content-submissions-changes')
|
||||
useEffect(() => {
|
||||
if (!channel) return;
|
||||
|
||||
channel
|
||||
.on(
|
||||
'postgres_changes',
|
||||
{
|
||||
@@ -65,18 +68,8 @@ export const useRealtimeSubmissions = (options: UseRealtimeSubmissionsOptions =
|
||||
console.log('Submission deleted:', payload);
|
||||
onDeleteRef.current?.(payload);
|
||||
}
|
||||
)
|
||||
.subscribe((status) => {
|
||||
console.log('Submissions realtime status:', status);
|
||||
});
|
||||
);
|
||||
}, [channel]);
|
||||
|
||||
setChannel(realtimeChannel);
|
||||
|
||||
return () => {
|
||||
console.log('Cleaning up submissions realtime subscription');
|
||||
supabase.removeChannel(realtimeChannel);
|
||||
};
|
||||
}, [enabled]);
|
||||
|
||||
return { channel };
|
||||
return { channel, connectionState, reconnect };
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user