/** * Realtime Subscriptions Hook for Moderation Queue * * Manages all Supabase realtime subscriptions for the moderation queue system. * Handles INSERT and UPDATE events with debouncing, filtering, and optimistic update protection. */ import { useEffect, useRef, useState, useCallback } from 'react'; import { supabase } from '@/integrations/supabase/client'; import type { RealtimeChannel } from '@supabase/supabase-js'; import type { ModerationItem, EntityFilter, StatusFilter } from '@/types/moderation'; import type { useEntityCache } from './useEntityCache'; import type { useProfileCache } from './useProfileCache'; import { matchesEntityFilter, matchesStatusFilter, hasItemChanged, buildModerationItem, } from '@/lib/moderation/realtime'; type EntityCacheReturn = ReturnType; type ProfileCacheReturn = ReturnType; /** * Configuration for realtime subscriptions */ export interface RealtimeSubscriptionConfig { /** Whether realtime subscriptions are enabled */ enabled: boolean; /** Current filter configuration */ filters: { entityFilter: EntityFilter; statusFilter: StatusFilter; }; /** Callback when a new item is detected */ onNewItem: (item: ModerationItem) => void; /** Callback when an item is updated */ onUpdateItem: (item: ModerationItem, shouldRemove: boolean) => void; /** Callback when an item is removed from queue */ onItemRemoved: (itemId: string) => void; /** Pause subscriptions when tab is hidden (default: true) */ pauseWhenHidden?: boolean; /** Debounce delay for UPDATE events in milliseconds (default: 1000) */ debounceMs?: number; /** Entity cache for resolving entity names */ entityCache: EntityCacheReturn; /** Profile cache for resolving user information */ profileCache: ProfileCacheReturn; /** Set of recently removed IDs (for optimistic updates) */ recentlyRemovedIds: Set; /** Set of IDs currently being interacted with */ interactingWithIds: Set; /** Current items in queue (for comparison) - using ref to avoid reconnections (optional) */ currentItemsRef?: React.MutableRefObject; } /** * Return type for useRealtimeSubscriptions hook */ export interface UseRealtimeSubscriptionsReturn { /** Whether subscriptions are currently connected */ isConnected: boolean; /** Current connection status */ channelStatus: 'connected' | 'disconnected' | 'error'; /** Manually reconnect subscriptions */ reconnect: () => void; } /** * Hook to manage realtime subscriptions for the moderation queue */ export function useRealtimeSubscriptions( config: RealtimeSubscriptionConfig ): UseRealtimeSubscriptionsReturn { const { enabled, filters, onNewItem, onUpdateItem, onItemRemoved, pauseWhenHidden = true, debounceMs = 1000, entityCache, profileCache, recentlyRemovedIds, interactingWithIds, currentItemsRef, } = config; // Debounce management for UPDATE events const updateDebounceMap = useRef>(new Map()); // Channel references const insertChannelRef = useRef(null); const updateChannelRef = useRef(null); // Status tracking const [channelStatus, setChannelStatus] = useState<'connected' | 'disconnected' | 'error'>('disconnected'); const [reconnectTrigger, setReconnectTrigger] = useState(0); /** * Debounced update handler - waits for rapid changes to settle */ const debouncedUpdate = useCallback((submissionId: string, updateFn: () => void) => { const existingTimeout = updateDebounceMap.current.get(submissionId); if (existingTimeout) { clearTimeout(existingTimeout); } const newTimeout = setTimeout(() => { updateFn(); updateDebounceMap.current.delete(submissionId); }, debounceMs); updateDebounceMap.current.set(submissionId, newTimeout); }, [debounceMs]); /** * Fetch full submission details with related data */ const fetchSubmissionDetails = useCallback(async (submissionId: string) => { const { data: submission, error } = await supabase .from('content_submissions') .select(` id, submission_type, status, content, created_at, user_id, reviewed_at, reviewer_id, reviewer_notes, escalated, assigned_to, locked_until, submission_items ( id, item_type, item_data, status ) `) .eq('id', submissionId) .single(); if (error || !submission) { console.error('Error fetching submission details:', error); return null; } return submission; }, []); /** * Resolve entity names for a submission */ const resolveEntityNames = useCallback(async (submission: any) => { const content = submission.content as any; let entityName = content?.name || 'Unknown'; let parkName: string | undefined; if (submission.submission_type === 'ride' && content?.entity_id) { // Try cache first const cachedRide = entityCache.getCached('rides', content.entity_id); if (cachedRide) { entityName = cachedRide.name; if (cachedRide.park_id) { const cachedPark = entityCache.getCached('parks', cachedRide.park_id); if (cachedPark) parkName = cachedPark.name; } } else { const { data: ride } = await supabase .from('rides') .select('name, park_id') .eq('id', content.entity_id) .maybeSingle(); if (ride) { entityName = ride.name; entityCache.setCached('rides', content.entity_id, ride); if (ride.park_id) { const { data: park } = await supabase .from('parks') .select('name') .eq('id', ride.park_id) .maybeSingle(); if (park) { parkName = park.name; entityCache.setCached('parks', ride.park_id, park); } } } } } else if (submission.submission_type === 'park' && content?.entity_id) { const cachedPark = entityCache.getCached('parks', content.entity_id); if (cachedPark) { entityName = cachedPark.name; } else { const { data: park } = await supabase .from('parks') .select('name') .eq('id', content.entity_id) .maybeSingle(); if (park) { entityName = park.name; entityCache.setCached('parks', content.entity_id, park); } } } else if (['manufacturer', 'operator', 'designer', 'property_owner'].includes(submission.submission_type) && content?.entity_id) { const cachedCompany = entityCache.getCached('companies', content.entity_id); if (cachedCompany) { entityName = cachedCompany.name; } else { const { data: company } = await supabase .from('companies') .select('name') .eq('id', content.entity_id) .maybeSingle(); if (company) { entityName = company.name; entityCache.setCached('companies', content.entity_id, company); } } } return { entityName, parkName }; }, [entityCache]); /** * Handle new submission INSERT event */ const handleInsert = useCallback(async (payload: any) => { const newSubmission = payload.new as any; console.log('🆕 Realtime INSERT:', newSubmission.id); // Queue updates if tab is hidden if (pauseWhenHidden && document.hidden) { console.log('📴 Realtime event received while hidden - queuing for later'); return; } // Ignore if recently removed (optimistic update) if (recentlyRemovedIds.has(newSubmission.id)) { console.log('⏭️ Ignoring INSERT for recently removed submission:', newSubmission.id); return; } // Only process pending/partially_approved submissions if (!['pending', 'partially_approved'].includes(newSubmission.status)) { return; } // Apply filters if (!matchesEntityFilter(newSubmission, filters.entityFilter)) { return; } if (!matchesStatusFilter(newSubmission, filters.statusFilter)) { return; } console.log('✅ NEW submission matches filters:', newSubmission.id); // Fetch full submission details try { const submission = await fetchSubmissionDetails(newSubmission.id); if (!submission) return; // Fetch user profile const profile = await profileCache.bulkFetch([submission.user_id]); const userProfile = profile[0]; // Resolve entity names const { entityName, parkName } = await resolveEntityNames(submission); // Build full ModerationItem const fullItem = buildModerationItem( submission, userProfile, entityName, parkName ); // Trigger callback onNewItem(fullItem); console.log('🎉 New submission added to queue:', fullItem.id); } catch (error) { console.error('Error processing new submission:', error); } }, [ filters, pauseWhenHidden, recentlyRemovedIds, fetchSubmissionDetails, profileCache, resolveEntityNames, onNewItem, ]); /** * Handle submission UPDATE event */ const handleUpdate = useCallback(async (payload: any) => { const updatedSubmission = payload.new as any; console.log('🔄 Realtime UPDATE:', updatedSubmission.id); // Queue updates if tab is hidden if (pauseWhenHidden && document.hidden) { console.log('📴 Realtime UPDATE received while hidden - queuing for later'); return; } // Ignore if recently removed (optimistic update in progress) if (recentlyRemovedIds.has(updatedSubmission.id)) { console.log('⏭️ Ignoring UPDATE for recently removed submission:', updatedSubmission.id); return; } // Ignore if currently being interacted with if (interactingWithIds.has(updatedSubmission.id)) { console.log('⏭️ Ignoring UPDATE for interacting submission:', updatedSubmission.id); return; } // Debounce the update debouncedUpdate(updatedSubmission.id, async () => { // Check if submission matches current filters const matchesEntity = matchesEntityFilter(updatedSubmission, filters.entityFilter); const matchesStatus = matchesStatusFilter(updatedSubmission, filters.statusFilter); const wasInQueue = currentItemsRef.current.some(i => i.id === updatedSubmission.id); const shouldBeInQueue = matchesEntity && matchesStatus; if (wasInQueue && !shouldBeInQueue) { // Submission moved out of current filter (e.g., pending → approved) console.log('❌ Submission moved out of queue:', updatedSubmission.id); onItemRemoved(updatedSubmission.id); return; } if (!shouldBeInQueue) { // Item doesn't belong in queue at all return; } // Fetch full details try { const submission = await fetchSubmissionDetails(updatedSubmission.id); if (!submission) return; // Get user profile const profiles = await profileCache.bulkFetch([submission.user_id]); const profile = profiles[0]; // Resolve entity name (simplified for updates) const content = submission.content as any; const entityName = content?.name || 'Unknown'; const fullItem = buildModerationItem( submission, profile, entityName, undefined ); // Check if item actually changed const currentItem = currentItemsRef.current.find(i => i.id === fullItem.id); if (currentItem && !hasItemChanged(currentItem, fullItem)) { console.log('✅ Realtime UPDATE: No changes detected for', fullItem.id); return; } console.log('🔄 Realtime UPDATE: Changes detected for', fullItem.id); onUpdateItem(fullItem, false); } catch (error) { console.error('Error processing updated submission:', error); } }); }, [ filters, pauseWhenHidden, recentlyRemovedIds, interactingWithIds, debouncedUpdate, fetchSubmissionDetails, profileCache, onUpdateItem, onItemRemoved, ]); /** * Setup INSERT subscription */ useEffect(() => { if (!enabled) { setChannelStatus('disconnected'); return; } console.log('📡 Setting up INSERT subscription'); const channel = supabase .channel('moderation-new-submissions') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'content_submissions', }, handleInsert ) .subscribe((status) => { console.log('INSERT subscription status:', status); if (status === 'SUBSCRIBED') { setChannelStatus('connected'); } else if (status === 'CHANNEL_ERROR') { setChannelStatus('error'); } }); insertChannelRef.current = channel; return () => { console.log('🛑 Cleaning up INSERT subscription'); supabase.removeChannel(channel); insertChannelRef.current = null; }; }, [enabled, handleInsert, reconnectTrigger]); /** * Setup UPDATE subscription */ useEffect(() => { if (!enabled) return; console.log('📡 Setting up UPDATE subscription'); const channel = supabase .channel('moderation-updated-submissions') .on( 'postgres_changes', { event: 'UPDATE', schema: 'public', table: 'content_submissions', }, handleUpdate ) .subscribe((status) => { console.log('UPDATE subscription status:', status); if (status === 'SUBSCRIBED') { setChannelStatus('connected'); } else if (status === 'CHANNEL_ERROR') { setChannelStatus('error'); } }); updateChannelRef.current = channel; return () => { console.log('🛑 Cleaning up UPDATE subscription'); supabase.removeChannel(channel); updateChannelRef.current = null; }; }, [enabled, handleUpdate, reconnectTrigger]); /** * Cleanup debounce timers on unmount */ useEffect(() => { return () => { updateDebounceMap.current.forEach(timeout => clearTimeout(timeout)); updateDebounceMap.current.clear(); }; }, []); /** * Manual reconnect function */ const reconnect = useCallback(() => { console.log('🔄 Manually reconnecting subscriptions...'); setReconnectTrigger(prev => prev + 1); }, []); return { isConnected: channelStatus === 'connected', channelStatus, reconnect, }; }