/** * Realtime Subscriptions Hook for Moderation Queue * * Manages all Supabase realtime subscriptions for the moderation queue system. * Handles INSERT and UPDATE events with debouncing, filtering, and optimistic update protection. */ import { useEffect, useRef, useState, useCallback } from 'react'; import { useQueryClient } from '@tanstack/react-query'; import { supabase } from '@/lib/supabaseClient'; import { logger } from '@/lib/logger'; import { getErrorMessage } from '@/lib/errorHandler'; import { MODERATION_CONSTANTS } from '@/lib/moderation/constants'; import type { RealtimeChannel, RealtimePostgresChangesPayload } from '@supabase/supabase-js'; import type { Json } from '@/integrations/supabase/types'; import type { ModerationItem, EntityFilter, StatusFilter } from '@/types/moderation'; import type { useEntityCache } from './useEntityCache'; import type { useProfileCache } from './useProfileCache'; import { matchesEntityFilter, matchesStatusFilter, hasItemChanged, buildModerationItem, } from '@/lib/moderation/realtime'; /** * Type-safe interface for submission content from realtime events */ interface SubmissionContent { action?: string; name?: string; entity_slug?: string; entity_name?: string; entity_id?: string; park_id?: string; } type EntityCacheReturn = ReturnType; type ProfileCacheReturn = ReturnType; /** * Configuration for realtime subscriptions */ export interface RealtimeSubscriptionConfig { /** Whether realtime subscriptions are enabled */ enabled: boolean; /** Current filter configuration */ filters: { entityFilter: EntityFilter; statusFilter: StatusFilter; }; /** Callback when a new item is detected */ onNewItem: (item: ModerationItem) => void; /** Callback when an item is updated */ onUpdateItem: (item: ModerationItem, shouldRemove: boolean) => void; /** Callback when an item is removed from queue */ onItemRemoved: (itemId: string) => void; /** Pause subscriptions when tab is hidden (default: true) */ pauseWhenHidden?: boolean; /** Debounce delay for UPDATE events in milliseconds */ debounceMs?: number; /** Entity cache for resolving entity names */ entityCache: EntityCacheReturn; /** Profile cache for resolving user information */ profileCache: ProfileCacheReturn; /** Set of recently removed IDs (for optimistic updates) */ recentlyRemovedIds: Set; /** Set of IDs currently being interacted with */ interactingWithIds: Set; /** Current items in queue (for comparison) - using ref to avoid reconnections (optional) */ currentItemsRef?: React.MutableRefObject; } /** * Return type for useRealtimeSubscriptions hook */ export interface UseRealtimeSubscriptionsReturn { /** Whether subscriptions are currently connected */ isConnected: boolean; /** Current connection status */ channelStatus: 'connected' | 'disconnected' | 'error'; /** Manually reconnect subscriptions */ reconnect: () => void; } /** * Hook to manage realtime subscriptions for the moderation queue */ export function useRealtimeSubscriptions( config: RealtimeSubscriptionConfig ): UseRealtimeSubscriptionsReturn { const queryClient = useQueryClient(); const { enabled, filters, onNewItem, onUpdateItem, onItemRemoved, pauseWhenHidden = true, debounceMs = MODERATION_CONSTANTS.REALTIME_DEBOUNCE_MS, entityCache, profileCache, recentlyRemovedIds, interactingWithIds, currentItemsRef, } = config; // Debounce management for UPDATE events const updateDebounceMap = useRef>(new Map()); // Channel references const insertChannelRef = useRef(null); const updateChannelRef = useRef(null); // Status tracking const [channelStatus, setChannelStatus] = useState<'connected' | 'disconnected' | 'error'>('disconnected'); const [reconnectTrigger, setReconnectTrigger] = useState(0); /** * Debounced update handler - waits for rapid changes to settle */ const debouncedUpdate = useCallback((submissionId: string, updateFn: () => void) => { const existingTimeout = updateDebounceMap.current.get(submissionId); if (existingTimeout) { clearTimeout(existingTimeout); } const newTimeout = setTimeout(() => { updateFn(); updateDebounceMap.current.delete(submissionId); }, debounceMs); updateDebounceMap.current.set(submissionId, newTimeout); }, [debounceMs]); /** * Fetch full submission details with related data */ const fetchSubmissionDetails = useCallback(async (submissionId: string) => { const { data: submission, error } = await supabase .from('content_submissions') .select(` id, submission_type, status, created_at, user_id, reviewed_at, reviewer_id, reviewer_notes, escalated, assigned_to, locked_until, submission_items ( id, item_type, item_data, status ), submission_metadata ( entity_id, park_id, ride_id ) `) .eq('id', submissionId) .single(); if (error || !submission) { // Silent - will retry on next attempt return null; } return submission; }, []); /** * Resolve entity names for a submission */ const resolveEntityNames = useCallback(async (submission: { submission_type: string; submission_metadata?: any[] }) => { // Get metadata const metadata = Array.isArray(submission.submission_metadata) && submission.submission_metadata.length > 0 ? submission.submission_metadata[0] : undefined; let entityName = 'Unknown'; let parkName: string | undefined; if (submission.submission_type === 'ride' && metadata?.entity_id) { // Try cache first const cachedRide = entityCache.getCached('rides', metadata.entity_id); if (cachedRide) { entityName = cachedRide.name; if (cachedRide.park_id) { const cachedPark = entityCache.getCached('parks', cachedRide.park_id); if (cachedPark) parkName = cachedPark.name; } } else { const { data: ride } = await supabase .from('rides') .select('id, name, park_id') .eq('id', metadata.entity_id) .maybeSingle(); if (ride) { entityName = ride.name; entityCache.setCached('rides', metadata.entity_id, ride); if (ride.park_id) { const { data: park } = await supabase .from('parks') .select('id, name') .eq('id', ride.park_id) .maybeSingle(); if (park) { parkName = park.name; entityCache.setCached('parks', ride.park_id, park); } } } } } else if (submission.submission_type === 'park' && metadata?.entity_id) { const cachedPark = entityCache.getCached('parks', metadata.entity_id); if (cachedPark) { entityName = cachedPark.name; } else { const { data: park } = await supabase .from('parks') .select('id, name') .eq('id', metadata.entity_id) .maybeSingle(); if (park) { entityName = park.name; entityCache.setCached('parks', metadata.entity_id, park); } } } else if (['manufacturer', 'operator', 'designer', 'property_owner'].includes(submission.submission_type) && metadata?.entity_id) { const cachedCompany = entityCache.getCached('companies', metadata.entity_id); if (cachedCompany) { entityName = cachedCompany.name; } else { const { data: company } = await supabase .from('companies') .select('id, name') .eq('id', metadata.entity_id) .maybeSingle(); if (company) { entityName = company.name; entityCache.setCached('companies', metadata.entity_id, company); } } } return { entityName, parkName }; }, [entityCache]); /** * Handle new submission INSERT event */ const handleInsert = useCallback(async (payload: RealtimePostgresChangesPayload) => { const newSubmission = payload.new; logger.log('🆕 Realtime INSERT:', newSubmission.id); // Queue updates if tab is hidden if (pauseWhenHidden && document.hidden) { logger.log('📴 Realtime event received while hidden - queuing for later'); return; } // Ignore if recently removed (optimistic update) if (recentlyRemovedIds.has(newSubmission.id)) { logger.log('⏭️ Ignoring INSERT for recently removed submission:', newSubmission.id); return; } // Only process pending/partially_approved submissions if (!['pending', 'partially_approved'].includes(newSubmission.status)) { return; } // Apply filters if (!matchesEntityFilter(newSubmission, filters.entityFilter)) { return; } if (!matchesStatusFilter(newSubmission, filters.statusFilter)) { return; } logger.log('✅ NEW submission matches filters, invalidating query:', newSubmission.id); // Invalidate the query to trigger background refetch await queryClient.invalidateQueries({ queryKey: ['moderation-queue'] }); // Call legacy callback for new item notification // (This maintains compatibility with NewItemsAlert component) try { const submission = await fetchSubmissionDetails(newSubmission.id); if (!submission) return; const profile = await profileCache.bulkFetch([submission.user_id]); const userProfile = profile[0]; const { entityName, parkName } = await resolveEntityNames(submission); const fullItem = buildModerationItem( submission, userProfile, entityName, parkName ); onNewItem(fullItem); } catch (error: unknown) { // Silent - notifications are non-critical } }, [ filters, pauseWhenHidden, recentlyRemovedIds, queryClient, fetchSubmissionDetails, profileCache, resolveEntityNames, onNewItem, ]); /** * Handle submission UPDATE event */ const handleUpdate = useCallback(async (payload: RealtimePostgresChangesPayload) => { const updatedSubmission = payload.new; const oldSubmission = payload.old; logger.log('🔄 Realtime UPDATE:', updatedSubmission.id); // Queue updates if tab is hidden if (pauseWhenHidden && document.hidden) { logger.log('📴 Realtime UPDATE received while hidden - queuing for later'); return; } // Ignore if recently removed (optimistic update in progress) if (recentlyRemovedIds.has(updatedSubmission.id)) { logger.log('⏭️ Ignoring UPDATE for recently removed submission:', updatedSubmission.id); return; } // Ignore if currently being interacted with if (interactingWithIds.has(updatedSubmission.id)) { logger.log('⏭️ Ignoring UPDATE for interacting submission:', updatedSubmission.id); return; } // Skip debounce for status changes (critical updates) const isStatusChange = oldSubmission && 'status' in oldSubmission && oldSubmission.status !== updatedSubmission?.status; if (isStatusChange) { logger.log('⚡ Status change detected, invalidating immediately'); await queryClient.invalidateQueries({ queryKey: ['moderation-queue'] }); const matchesEntity = matchesEntityFilter(updatedSubmission, filters.entityFilter); const matchesStatus = matchesStatusFilter(updatedSubmission, filters.statusFilter); const shouldBeInQueue = matchesEntity && matchesStatus; if (!shouldBeInQueue) { onItemRemoved(updatedSubmission.id); } return; // Skip debounced update } // Use debounce for non-critical updates debouncedUpdate(updatedSubmission.id, async () => { logger.log('🔄 Invalidating query due to UPDATE:', updatedSubmission.id); // Simply invalidate the query - TanStack Query handles the rest await queryClient.invalidateQueries({ queryKey: ['moderation-queue'] }); // Legacy callback for compatibility const matchesEntity = matchesEntityFilter(updatedSubmission, filters.entityFilter); const matchesStatus = matchesStatusFilter(updatedSubmission, filters.statusFilter); const shouldBeInQueue = matchesEntity && matchesStatus; if (!shouldBeInQueue) { onItemRemoved(updatedSubmission.id); } }); }, [ filters, pauseWhenHidden, recentlyRemovedIds, interactingWithIds, debouncedUpdate, queryClient, onItemRemoved, ]); /** * Setup INSERT subscription */ useEffect(() => { if (!enabled) { setChannelStatus('disconnected'); return; } logger.log('📡 Setting up INSERT subscription'); const channel = supabase .channel('moderation-new-submissions') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'content_submissions', }, handleInsert ) .subscribe((status) => { logger.log('INSERT subscription status:', status); if (status === 'SUBSCRIBED') { setChannelStatus('connected'); } else if (status === 'CHANNEL_ERROR') { setChannelStatus('error'); } }); insertChannelRef.current = channel; return () => { logger.log('🛑 Cleaning up INSERT subscription'); supabase.removeChannel(channel); insertChannelRef.current = null; }; }, [enabled, handleInsert, reconnectTrigger]); /** * Setup UPDATE subscription */ useEffect(() => { if (!enabled) return; logger.log('📡 Setting up UPDATE subscription'); const channel = supabase .channel('moderation-updated-submissions') .on( 'postgres_changes', { event: 'UPDATE', schema: 'public', table: 'content_submissions', }, handleUpdate ) .subscribe((status) => { logger.log('UPDATE subscription status:', status); if (status === 'SUBSCRIBED') { setChannelStatus('connected'); } else if (status === 'CHANNEL_ERROR') { setChannelStatus('error'); } }); updateChannelRef.current = channel; return () => { logger.log('🛑 Cleaning up UPDATE subscription'); supabase.removeChannel(channel); updateChannelRef.current = null; }; }, [enabled, handleUpdate, reconnectTrigger]); /** * Cleanup debounce timers on unmount */ useEffect(() => { return () => { updateDebounceMap.current.forEach(timeout => clearTimeout(timeout)); updateDebounceMap.current.clear(); }; }, []); /** * Manual reconnect function */ const reconnect = useCallback(() => { logger.log('🔄 Manually reconnecting subscriptions...'); setReconnectTrigger(prev => prev + 1); }, []); return { isConnected: channelStatus === 'connected', channelStatus, reconnect, }; }