diff --git a/src/hooks/moderation/index.ts b/src/hooks/moderation/index.ts index 48f2e2ea..a2e86c61 100644 --- a/src/hooks/moderation/index.ts +++ b/src/hooks/moderation/index.ts @@ -17,3 +17,9 @@ export type { ModerationSort, ModerationSortConfig } from './useModerationSort'; export { usePagination } from './usePagination'; export type { PaginationState, PaginationConfig } from './usePagination'; + +export { useRealtimeSubscriptions } from './useRealtimeSubscriptions'; +export type { + RealtimeSubscriptionConfig, + UseRealtimeSubscriptionsReturn +} from './useRealtimeSubscriptions'; diff --git a/src/hooks/moderation/useRealtimeSubscriptions.ts b/src/hooks/moderation/useRealtimeSubscriptions.ts new file mode 100644 index 00000000..619d1b3e --- /dev/null +++ b/src/hooks/moderation/useRealtimeSubscriptions.ts @@ -0,0 +1,503 @@ +/** + * Realtime Subscriptions Hook for Moderation Queue + * + * Manages all Supabase realtime subscriptions for the moderation queue system. + * Handles INSERT and UPDATE events with debouncing, filtering, and optimistic update protection. + */ + +import { useEffect, useRef, useState, useCallback } from 'react'; +import { supabase } from '@/integrations/supabase/client'; +import type { RealtimeChannel } from '@supabase/supabase-js'; +import type { ModerationItem, EntityFilter, StatusFilter } from '@/types/moderation'; +import type { useEntityCache } from './useEntityCache'; +import type { useProfileCache } from './useProfileCache'; +import { + matchesEntityFilter, + matchesStatusFilter, + hasItemChanged, + buildModerationItem, +} from '@/lib/moderation/realtime'; + +type EntityCacheReturn = ReturnType; +type ProfileCacheReturn = ReturnType; + +/** + * Configuration for realtime subscriptions + */ +export interface RealtimeSubscriptionConfig { + /** Whether realtime subscriptions are enabled */ + enabled: boolean; + + /** Current filter configuration */ + filters: { + entityFilter: EntityFilter; + statusFilter: StatusFilter; + }; + + /** Callback when a new item is detected */ + onNewItem: (item: ModerationItem) => void; + + /** Callback when an item is updated */ + onUpdateItem: (item: ModerationItem, shouldRemove: boolean) => void; + + /** Callback when an item is removed from queue */ + onItemRemoved: (itemId: string) => void; + + /** Pause subscriptions when tab is hidden (default: true) */ + pauseWhenHidden?: boolean; + + /** Debounce delay for UPDATE events in milliseconds (default: 1000) */ + debounceMs?: number; + + /** Entity cache for resolving entity names */ + entityCache: EntityCacheReturn; + + /** Profile cache for resolving user information */ + profileCache: ProfileCacheReturn; + + /** Set of recently removed IDs (for optimistic updates) */ + recentlyRemovedIds: Set; + + /** Set of IDs currently being interacted with */ + interactingWithIds: Set; + + /** Current items in queue (for comparison) */ + currentItems: ModerationItem[]; +} + +/** + * Return type for useRealtimeSubscriptions hook + */ +export interface UseRealtimeSubscriptionsReturn { + /** Whether subscriptions are currently connected */ + isConnected: boolean; + + /** Current connection status */ + channelStatus: 'connected' | 'disconnected' | 'error'; + + /** Manually reconnect subscriptions */ + reconnect: () => void; +} + +/** + * Hook to manage realtime subscriptions for the moderation queue + */ +export function useRealtimeSubscriptions( + config: RealtimeSubscriptionConfig +): UseRealtimeSubscriptionsReturn { + const { + enabled, + filters, + onNewItem, + onUpdateItem, + onItemRemoved, + pauseWhenHidden = true, + debounceMs = 1000, + entityCache, + profileCache, + recentlyRemovedIds, + interactingWithIds, + currentItems, + } = config; + + // Debounce management for UPDATE events + const updateDebounceMap = useRef>(new Map()); + + // Channel references + const insertChannelRef = useRef(null); + const updateChannelRef = useRef(null); + + // Status tracking + const [channelStatus, setChannelStatus] = useState<'connected' | 'disconnected' | 'error'>('disconnected'); + const [reconnectTrigger, setReconnectTrigger] = useState(0); + + /** + * Debounced update handler - waits for rapid changes to settle + */ + const debouncedUpdate = useCallback((submissionId: string, updateFn: () => void) => { + const existingTimeout = updateDebounceMap.current.get(submissionId); + if (existingTimeout) { + clearTimeout(existingTimeout); + } + + const newTimeout = setTimeout(() => { + updateFn(); + updateDebounceMap.current.delete(submissionId); + }, debounceMs); + + updateDebounceMap.current.set(submissionId, newTimeout); + }, [debounceMs]); + + /** + * Fetch full submission details with related data + */ + const fetchSubmissionDetails = useCallback(async (submissionId: string) => { + const { data: submission, error } = await supabase + .from('content_submissions') + .select(` + id, submission_type, status, content, created_at, user_id, + reviewed_at, reviewer_id, reviewer_notes, escalated, assigned_to, locked_until, + submission_items ( + id, + item_type, + item_data, + status + ) + `) + .eq('id', submissionId) + .single(); + + if (error || !submission) { + console.error('Error fetching submission details:', error); + return null; + } + + return submission; + }, []); + + /** + * Resolve entity names for a submission + */ + const resolveEntityNames = useCallback(async (submission: any) => { + const content = submission.content as any; + let entityName = content?.name || 'Unknown'; + let parkName: string | undefined; + + if (submission.submission_type === 'ride' && content?.entity_id) { + // Try cache first + const cachedRide = entityCache.getCached('rides', content.entity_id); + if (cachedRide) { + entityName = cachedRide.name; + if (cachedRide.park_id) { + const cachedPark = entityCache.getCached('parks', cachedRide.park_id); + if (cachedPark) parkName = cachedPark.name; + } + } else { + const { data: ride } = await supabase + .from('rides') + .select('name, park_id') + .eq('id', content.entity_id) + .maybeSingle(); + + if (ride) { + entityName = ride.name; + entityCache.setCached('rides', content.entity_id, ride); + + if (ride.park_id) { + const { data: park } = await supabase + .from('parks') + .select('name') + .eq('id', ride.park_id) + .maybeSingle(); + + if (park) { + parkName = park.name; + entityCache.setCached('parks', ride.park_id, park); + } + } + } + } + } else if (submission.submission_type === 'park' && content?.entity_id) { + const cachedPark = entityCache.getCached('parks', content.entity_id); + if (cachedPark) { + entityName = cachedPark.name; + } else { + const { data: park } = await supabase + .from('parks') + .select('name') + .eq('id', content.entity_id) + .maybeSingle(); + + if (park) { + entityName = park.name; + entityCache.setCached('parks', content.entity_id, park); + } + } + } else if (['manufacturer', 'operator', 'designer', 'property_owner'].includes(submission.submission_type) && content?.entity_id) { + const cachedCompany = entityCache.getCached('companies', content.entity_id); + if (cachedCompany) { + entityName = cachedCompany.name; + } else { + const { data: company } = await supabase + .from('companies') + .select('name') + .eq('id', content.entity_id) + .maybeSingle(); + + if (company) { + entityName = company.name; + entityCache.setCached('companies', content.entity_id, company); + } + } + } + + return { entityName, parkName }; + }, [entityCache]); + + /** + * Handle new submission INSERT event + */ + const handleInsert = useCallback(async (payload: any) => { + const newSubmission = payload.new as any; + + console.log('🆕 Realtime INSERT:', newSubmission.id); + + // Queue updates if tab is hidden + if (pauseWhenHidden && document.hidden) { + console.log('📴 Realtime event received while hidden - queuing for later'); + return; + } + + // Ignore if recently removed (optimistic update) + if (recentlyRemovedIds.has(newSubmission.id)) { + console.log('⏭️ Ignoring INSERT for recently removed submission:', newSubmission.id); + return; + } + + // Only process pending/partially_approved submissions + if (!['pending', 'partially_approved'].includes(newSubmission.status)) { + return; + } + + // Apply filters + if (!matchesEntityFilter(newSubmission, filters.entityFilter)) { + return; + } + + if (!matchesStatusFilter(newSubmission, filters.statusFilter)) { + return; + } + + console.log('✅ NEW submission matches filters:', newSubmission.id); + + // Fetch full submission details + try { + const submission = await fetchSubmissionDetails(newSubmission.id); + if (!submission) return; + + // Fetch user profile + const profile = await profileCache.bulkFetch([submission.user_id]); + const userProfile = profile[0]; + + // Resolve entity names + const { entityName, parkName } = await resolveEntityNames(submission); + + // Build full ModerationItem + const fullItem = buildModerationItem( + submission, + userProfile, + entityName, + parkName + ); + + // Trigger callback + onNewItem(fullItem); + + console.log('🎉 New submission added to queue:', fullItem.id); + } catch (error) { + console.error('Error processing new submission:', error); + } + }, [ + filters, + pauseWhenHidden, + recentlyRemovedIds, + fetchSubmissionDetails, + profileCache, + resolveEntityNames, + onNewItem, + ]); + + /** + * Handle submission UPDATE event + */ + const handleUpdate = useCallback(async (payload: any) => { + const updatedSubmission = payload.new as any; + + console.log('🔄 Realtime UPDATE:', updatedSubmission.id); + + // Queue updates if tab is hidden + if (pauseWhenHidden && document.hidden) { + console.log('📴 Realtime UPDATE received while hidden - queuing for later'); + return; + } + + // Ignore if recently removed (optimistic update in progress) + if (recentlyRemovedIds.has(updatedSubmission.id)) { + console.log('⏭️ Ignoring UPDATE for recently removed submission:', updatedSubmission.id); + return; + } + + // Ignore if currently being interacted with + if (interactingWithIds.has(updatedSubmission.id)) { + console.log('⏭️ Ignoring UPDATE for interacting submission:', updatedSubmission.id); + return; + } + + // Debounce the update + debouncedUpdate(updatedSubmission.id, async () => { + // Check if submission matches current filters + const matchesEntity = matchesEntityFilter(updatedSubmission, filters.entityFilter); + const matchesStatus = matchesStatusFilter(updatedSubmission, filters.statusFilter); + + const wasInQueue = currentItems.some(i => i.id === updatedSubmission.id); + const shouldBeInQueue = matchesEntity && matchesStatus; + + if (wasInQueue && !shouldBeInQueue) { + // Submission moved out of current filter (e.g., pending → approved) + console.log('❌ Submission moved out of queue:', updatedSubmission.id); + onItemRemoved(updatedSubmission.id); + return; + } + + if (!shouldBeInQueue) { + // Item doesn't belong in queue at all + return; + } + + // Fetch full details + try { + const submission = await fetchSubmissionDetails(updatedSubmission.id); + if (!submission) return; + + // Get user profile + const profiles = await profileCache.bulkFetch([submission.user_id]); + const profile = profiles[0]; + + // Resolve entity name (simplified for updates) + const content = submission.content as any; + const entityName = content?.name || 'Unknown'; + + const fullItem = buildModerationItem( + submission, + profile, + entityName, + undefined + ); + + // Check if item actually changed + const currentItem = currentItems.find(i => i.id === fullItem.id); + if (currentItem && !hasItemChanged(currentItem, fullItem)) { + console.log('✅ Realtime UPDATE: No changes detected for', fullItem.id); + return; + } + + console.log('🔄 Realtime UPDATE: Changes detected for', fullItem.id); + onUpdateItem(fullItem, false); + } catch (error) { + console.error('Error processing updated submission:', error); + } + }); + }, [ + filters, + pauseWhenHidden, + recentlyRemovedIds, + interactingWithIds, + currentItems, + debouncedUpdate, + fetchSubmissionDetails, + profileCache, + onUpdateItem, + onItemRemoved, + ]); + + /** + * Setup INSERT subscription + */ + useEffect(() => { + if (!enabled) { + setChannelStatus('disconnected'); + return; + } + + console.log('📡 Setting up INSERT subscription'); + + const channel = supabase + .channel('moderation-new-submissions') + .on( + 'postgres_changes', + { + event: 'INSERT', + schema: 'public', + table: 'content_submissions', + }, + handleInsert + ) + .subscribe((status) => { + console.log('INSERT subscription status:', status); + if (status === 'SUBSCRIBED') { + setChannelStatus('connected'); + } else if (status === 'CHANNEL_ERROR') { + setChannelStatus('error'); + } + }); + + insertChannelRef.current = channel; + + return () => { + console.log('🛑 Cleaning up INSERT subscription'); + supabase.removeChannel(channel); + insertChannelRef.current = null; + }; + }, [enabled, handleInsert, reconnectTrigger]); + + /** + * Setup UPDATE subscription + */ + useEffect(() => { + if (!enabled) return; + + console.log('📡 Setting up UPDATE subscription'); + + const channel = supabase + .channel('moderation-updated-submissions') + .on( + 'postgres_changes', + { + event: 'UPDATE', + schema: 'public', + table: 'content_submissions', + }, + handleUpdate + ) + .subscribe((status) => { + console.log('UPDATE subscription status:', status); + if (status === 'SUBSCRIBED') { + setChannelStatus('connected'); + } else if (status === 'CHANNEL_ERROR') { + setChannelStatus('error'); + } + }); + + updateChannelRef.current = channel; + + return () => { + console.log('🛑 Cleaning up UPDATE subscription'); + supabase.removeChannel(channel); + updateChannelRef.current = null; + }; + }, [enabled, handleUpdate, reconnectTrigger]); + + /** + * Cleanup debounce timers on unmount + */ + useEffect(() => { + return () => { + updateDebounceMap.current.forEach(timeout => clearTimeout(timeout)); + updateDebounceMap.current.clear(); + }; + }, []); + + /** + * Manual reconnect function + */ + const reconnect = useCallback(() => { + console.log('🔄 Manually reconnecting subscriptions...'); + setReconnectTrigger(prev => prev + 1); + }, []); + + return { + isConnected: channelStatus === 'connected', + channelStatus, + reconnect, + }; +} diff --git a/src/lib/moderation/index.ts b/src/lib/moderation/index.ts index 23ef81d8..5b40a48a 100644 --- a/src/lib/moderation/index.ts +++ b/src/lib/moderation/index.ts @@ -55,3 +55,12 @@ export { } from './sorting'; export type { SortConfig, SortField, SortDirection } from '@/types/moderation'; + +// Realtime subscription utilities +export { + matchesEntityFilter, + matchesStatusFilter, + hasItemChanged, + extractChangedFields, + buildModerationItem, +} from './realtime'; diff --git a/src/lib/moderation/realtime.ts b/src/lib/moderation/realtime.ts new file mode 100644 index 00000000..07854ae8 --- /dev/null +++ b/src/lib/moderation/realtime.ts @@ -0,0 +1,179 @@ +/** + * Realtime Subscription Utilities + * + * Helper functions for processing realtime subscription events in the moderation queue. + */ + +import type { ModerationItem, EntityFilter, StatusFilter } from '@/types/moderation'; + +/** + * Check if a submission matches the entity filter + */ +export function matchesEntityFilter( + submission: { submission_type: string }, + entityFilter: EntityFilter +): boolean { + if (entityFilter === 'all') return true; + + if (entityFilter === 'photos') { + return submission.submission_type === 'photo'; + } + + if (entityFilter === 'submissions') { + return submission.submission_type !== 'photo'; + } + + if (entityFilter === 'reviews') { + return submission.submission_type === 'review'; + } + + return false; +} + +/** + * Check if a submission matches the status filter + */ +export function matchesStatusFilter( + submission: { status: string }, + statusFilter: StatusFilter +): boolean { + if (statusFilter === 'all') return true; + + if (statusFilter === 'pending') { + return ['pending', 'partially_approved'].includes(submission.status); + } + + return statusFilter === submission.status; +} + +/** + * Deep comparison of ModerationItem fields to detect actual changes + */ +export function hasItemChanged( + current: ModerationItem, + updated: ModerationItem +): boolean { + // Check critical fields + if ( + current.status !== updated.status || + current.reviewed_at !== updated.reviewed_at || + current.reviewer_notes !== updated.reviewer_notes || + current.assigned_to !== updated.assigned_to || + current.locked_until !== updated.locked_until || + current.escalated !== updated.escalated + ) { + return true; + } + + // Check submission_items + if (current.submission_items?.length !== updated.submission_items?.length) { + return true; + } + + // Check content (one level deep for performance) + if (current.content && updated.content) { + // Compare content reference first + if (current.content !== updated.content) { + const currentKeys = Object.keys(current.content).sort(); + const updatedKeys = Object.keys(updated.content).sort(); + + // Different number of keys = changed + if (currentKeys.length !== updatedKeys.length) { + return true; + } + + // Different key names = changed + if (!currentKeys.every((key, i) => key === updatedKeys[i])) { + return true; + } + + // Check each key's value + for (const key of currentKeys) { + if (current.content[key] !== updated.content[key]) { + return true; + } + } + } + } + + return false; +} + +/** + * Extract only changed fields for minimal updates + */ +export function extractChangedFields( + current: ModerationItem, + updated: ModerationItem +): Partial { + const changes: Partial = {}; + + if (current.status !== updated.status) { + changes.status = updated.status; + } + + if (current.reviewed_at !== updated.reviewed_at) { + changes.reviewed_at = updated.reviewed_at; + } + + if (current.reviewer_notes !== updated.reviewer_notes) { + changes.reviewer_notes = updated.reviewer_notes; + } + + if (current.assigned_to !== updated.assigned_to) { + changes.assigned_to = updated.assigned_to; + } + + if (current.locked_until !== updated.locked_until) { + changes.locked_until = updated.locked_until; + } + + if (current.escalated !== updated.escalated) { + changes.escalated = updated.escalated; + } + + // Check content changes + if (current.content !== updated.content) { + changes.content = updated.content; + } + + // Check submission_items + if (updated.submission_items) { + changes.submission_items = updated.submission_items; + } + + return changes; +} + +/** + * Build a full ModerationItem from submission data + */ +export function buildModerationItem( + submission: any, + profile?: any, + entityName?: string, + parkName?: string +): ModerationItem { + return { + id: submission.id, + type: 'content_submission', + content: submission.content, + created_at: submission.created_at, + user_id: submission.user_id, + status: submission.status, + submission_type: submission.submission_type, + user_profile: profile ? { + username: profile.username, + display_name: profile.display_name, + avatar_url: profile.avatar_url, + } : undefined, + entity_name: entityName || (submission.content as any)?.name || 'Unknown', + park_name: parkName, + reviewed_at: submission.reviewed_at || undefined, + reviewer_notes: submission.reviewer_notes || undefined, + escalated: submission.escalated || false, + assigned_to: submission.assigned_to || undefined, + locked_until: submission.locked_until || undefined, + submission_items: submission.submission_items || undefined, + }; +}