Files
thrilltrack-explorer/src-old/hooks/moderation/useRealtimeSubscriptions.ts

509 lines
15 KiB
TypeScript

/**
* Realtime Subscriptions Hook for Moderation Queue
*
* Manages all Supabase realtime subscriptions for the moderation queue system.
* Handles INSERT and UPDATE events with debouncing, filtering, and optimistic update protection.
*/
import { useEffect, useRef, useState, useCallback } from 'react';
import { useQueryClient } from '@tanstack/react-query';
import { supabase } from '@/lib/supabaseClient';
import { logger } from '@/lib/logger';
import { getErrorMessage } from '@/lib/errorHandler';
import { MODERATION_CONSTANTS } from '@/lib/moderation/constants';
import type { RealtimeChannel, RealtimePostgresChangesPayload } from '@supabase/supabase-js';
import type { Json } from '@/integrations/supabase/types';
import type { ModerationItem, EntityFilter, StatusFilter } from '@/types/moderation';
import type { useEntityCache } from './useEntityCache';
import type { useProfileCache } from './useProfileCache';
import {
matchesEntityFilter,
matchesStatusFilter,
hasItemChanged,
buildModerationItem,
} from '@/lib/moderation/realtime';
/**
* Type-safe interface for submission content from realtime events
*/
interface SubmissionContent {
action?: string;
name?: string;
entity_slug?: string;
entity_name?: string;
entity_id?: string;
park_id?: string;
}
type EntityCacheReturn = ReturnType<typeof useEntityCache>;
type ProfileCacheReturn = ReturnType<typeof useProfileCache>;
/**
* Configuration for realtime subscriptions
*/
export interface RealtimeSubscriptionConfig {
/** Whether realtime subscriptions are enabled */
enabled: boolean;
/** Current filter configuration */
filters: {
entityFilter: EntityFilter;
statusFilter: StatusFilter;
};
/** Callback when a new item is detected */
onNewItem: (item: ModerationItem) => void;
/** Callback when an item is updated */
onUpdateItem: (item: ModerationItem, shouldRemove: boolean) => void;
/** Callback when an item is removed from queue */
onItemRemoved: (itemId: string) => void;
/** Pause subscriptions when tab is hidden (default: true) */
pauseWhenHidden?: boolean;
/** Debounce delay for UPDATE events in milliseconds */
debounceMs?: number;
/** Entity cache for resolving entity names */
entityCache: EntityCacheReturn;
/** Profile cache for resolving user information */
profileCache: ProfileCacheReturn;
/** Set of recently removed IDs (for optimistic updates) */
recentlyRemovedIds: Set<string>;
/** Set of IDs currently being interacted with */
interactingWithIds: Set<string>;
/** Current items in queue (for comparison) - using ref to avoid reconnections (optional) */
currentItemsRef?: React.MutableRefObject<ModerationItem[]>;
}
/**
* Return type for useRealtimeSubscriptions hook
*/
export interface UseRealtimeSubscriptionsReturn {
/** Whether subscriptions are currently connected */
isConnected: boolean;
/** Current connection status */
channelStatus: 'connected' | 'disconnected' | 'error';
/** Manually reconnect subscriptions */
reconnect: () => void;
}
/**
* Hook to manage realtime subscriptions for the moderation queue
*/
export function useRealtimeSubscriptions(
config: RealtimeSubscriptionConfig
): UseRealtimeSubscriptionsReturn {
const queryClient = useQueryClient();
const {
enabled,
filters,
onNewItem,
onUpdateItem,
onItemRemoved,
pauseWhenHidden = true,
debounceMs = MODERATION_CONSTANTS.REALTIME_DEBOUNCE_MS,
entityCache,
profileCache,
recentlyRemovedIds,
interactingWithIds,
currentItemsRef,
} = config;
// Debounce management for UPDATE events
const updateDebounceMap = useRef<Map<string, NodeJS.Timeout>>(new Map());
// Channel references
const insertChannelRef = useRef<RealtimeChannel | null>(null);
const updateChannelRef = useRef<RealtimeChannel | null>(null);
// Status tracking
const [channelStatus, setChannelStatus] = useState<'connected' | 'disconnected' | 'error'>('disconnected');
const [reconnectTrigger, setReconnectTrigger] = useState(0);
/**
* Debounced update handler - waits for rapid changes to settle
*/
const debouncedUpdate = useCallback((submissionId: string, updateFn: () => void) => {
const existingTimeout = updateDebounceMap.current.get(submissionId);
if (existingTimeout) {
clearTimeout(existingTimeout);
}
const newTimeout = setTimeout(() => {
updateFn();
updateDebounceMap.current.delete(submissionId);
}, debounceMs);
updateDebounceMap.current.set(submissionId, newTimeout);
}, [debounceMs]);
/**
* Fetch full submission details with related data
*/
const fetchSubmissionDetails = useCallback(async (submissionId: string) => {
const { data: submission, error } = await supabase
.from('content_submissions')
.select(`
id, submission_type, status, created_at, user_id,
reviewed_at, reviewer_id, reviewer_notes, escalated, assigned_to, locked_until,
submission_items (
id,
item_type,
item_data,
status
),
submission_metadata (
entity_id,
park_id,
ride_id
)
`)
.eq('id', submissionId)
.single();
if (error || !submission) {
// Silent - will retry on next attempt
return null;
}
return submission;
}, []);
/**
* Resolve entity names for a submission
*/
const resolveEntityNames = useCallback(async (submission: { submission_type: string; submission_metadata?: any[] }) => {
// Get metadata
const metadata = Array.isArray(submission.submission_metadata) && submission.submission_metadata.length > 0
? submission.submission_metadata[0]
: undefined;
let entityName = 'Unknown';
let parkName: string | undefined;
if (submission.submission_type === 'ride' && metadata?.entity_id) {
// Try cache first
const cachedRide = entityCache.getCached('rides', metadata.entity_id);
if (cachedRide) {
entityName = cachedRide.name;
if (cachedRide.park_id) {
const cachedPark = entityCache.getCached('parks', cachedRide.park_id);
if (cachedPark) parkName = cachedPark.name;
}
} else {
const { data: ride } = await supabase
.from('rides')
.select('id, name, park_id')
.eq('id', metadata.entity_id)
.maybeSingle();
if (ride) {
entityName = ride.name;
entityCache.setCached('rides', metadata.entity_id, ride);
if (ride.park_id) {
const { data: park } = await supabase
.from('parks')
.select('id, name')
.eq('id', ride.park_id)
.maybeSingle();
if (park) {
parkName = park.name;
entityCache.setCached('parks', ride.park_id, park);
}
}
}
}
} else if (submission.submission_type === 'park' && metadata?.entity_id) {
const cachedPark = entityCache.getCached('parks', metadata.entity_id);
if (cachedPark) {
entityName = cachedPark.name;
} else {
const { data: park } = await supabase
.from('parks')
.select('id, name')
.eq('id', metadata.entity_id)
.maybeSingle();
if (park) {
entityName = park.name;
entityCache.setCached('parks', metadata.entity_id, park);
}
}
} else if (['manufacturer', 'operator', 'designer', 'property_owner'].includes(submission.submission_type) && metadata?.entity_id) {
const cachedCompany = entityCache.getCached('companies', metadata.entity_id);
if (cachedCompany) {
entityName = cachedCompany.name;
} else {
const { data: company } = await supabase
.from('companies')
.select('id, name')
.eq('id', metadata.entity_id)
.maybeSingle();
if (company) {
entityName = company.name;
entityCache.setCached('companies', metadata.entity_id, company);
}
}
}
return { entityName, parkName };
}, [entityCache]);
/**
* Handle new submission INSERT event
*/
const handleInsert = useCallback(async (payload: RealtimePostgresChangesPayload<any>) => {
const newSubmission = payload.new;
logger.log('🆕 Realtime INSERT:', newSubmission.id);
// Queue updates if tab is hidden
if (pauseWhenHidden && document.hidden) {
logger.log('📴 Realtime event received while hidden - queuing for later');
return;
}
// Ignore if recently removed (optimistic update)
if (recentlyRemovedIds.has(newSubmission.id)) {
logger.log('⏭️ Ignoring INSERT for recently removed submission:', newSubmission.id);
return;
}
// Only process pending/partially_approved submissions
if (!['pending', 'partially_approved'].includes(newSubmission.status)) {
return;
}
// Apply filters
if (!matchesEntityFilter(newSubmission, filters.entityFilter)) {
return;
}
if (!matchesStatusFilter(newSubmission, filters.statusFilter)) {
return;
}
logger.log('✅ NEW submission matches filters, invalidating query:', newSubmission.id);
// Invalidate the query to trigger background refetch
await queryClient.invalidateQueries({ queryKey: ['moderation-queue'] });
// Call legacy callback for new item notification
// (This maintains compatibility with NewItemsAlert component)
try {
const submission = await fetchSubmissionDetails(newSubmission.id);
if (!submission) return;
const profile = await profileCache.bulkFetch([submission.user_id]);
const userProfile = profile[0];
const { entityName, parkName } = await resolveEntityNames(submission);
const fullItem = buildModerationItem(
submission,
userProfile,
entityName,
parkName
);
onNewItem(fullItem);
} catch (error: unknown) {
// Silent - notifications are non-critical
}
}, [
filters,
pauseWhenHidden,
recentlyRemovedIds,
queryClient,
fetchSubmissionDetails,
profileCache,
resolveEntityNames,
onNewItem,
]);
/**
* Handle submission UPDATE event
*/
const handleUpdate = useCallback(async (payload: RealtimePostgresChangesPayload<any>) => {
const updatedSubmission = payload.new;
const oldSubmission = payload.old;
logger.log('🔄 Realtime UPDATE:', updatedSubmission.id);
// Queue updates if tab is hidden
if (pauseWhenHidden && document.hidden) {
logger.log('📴 Realtime UPDATE received while hidden - queuing for later');
return;
}
// Ignore if recently removed (optimistic update in progress)
if (recentlyRemovedIds.has(updatedSubmission.id)) {
logger.log('⏭️ Ignoring UPDATE for recently removed submission:', updatedSubmission.id);
return;
}
// Ignore if currently being interacted with
if (interactingWithIds.has(updatedSubmission.id)) {
logger.log('⏭️ Ignoring UPDATE for interacting submission:', updatedSubmission.id);
return;
}
// Skip debounce for status changes (critical updates)
const isStatusChange = oldSubmission && 'status' in oldSubmission
&& oldSubmission.status !== updatedSubmission?.status;
if (isStatusChange) {
logger.log('⚡ Status change detected, invalidating immediately');
await queryClient.invalidateQueries({ queryKey: ['moderation-queue'] });
const matchesEntity = matchesEntityFilter(updatedSubmission, filters.entityFilter);
const matchesStatus = matchesStatusFilter(updatedSubmission, filters.statusFilter);
const shouldBeInQueue = matchesEntity && matchesStatus;
if (!shouldBeInQueue) {
onItemRemoved(updatedSubmission.id);
}
return; // Skip debounced update
}
// Use debounce for non-critical updates
debouncedUpdate(updatedSubmission.id, async () => {
logger.log('🔄 Invalidating query due to UPDATE:', updatedSubmission.id);
// Simply invalidate the query - TanStack Query handles the rest
await queryClient.invalidateQueries({ queryKey: ['moderation-queue'] });
// Legacy callback for compatibility
const matchesEntity = matchesEntityFilter(updatedSubmission, filters.entityFilter);
const matchesStatus = matchesStatusFilter(updatedSubmission, filters.statusFilter);
const shouldBeInQueue = matchesEntity && matchesStatus;
if (!shouldBeInQueue) {
onItemRemoved(updatedSubmission.id);
}
});
}, [
filters,
pauseWhenHidden,
recentlyRemovedIds,
interactingWithIds,
debouncedUpdate,
queryClient,
onItemRemoved,
]);
/**
* Setup INSERT subscription
*/
useEffect(() => {
if (!enabled) {
setChannelStatus('disconnected');
return;
}
logger.log('📡 Setting up INSERT subscription');
const channel = supabase
.channel('moderation-new-submissions')
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'content_submissions',
},
handleInsert
)
.subscribe((status) => {
logger.log('INSERT subscription status:', status);
if (status === 'SUBSCRIBED') {
setChannelStatus('connected');
} else if (status === 'CHANNEL_ERROR') {
setChannelStatus('error');
}
});
insertChannelRef.current = channel;
return () => {
logger.log('🛑 Cleaning up INSERT subscription');
supabase.removeChannel(channel);
insertChannelRef.current = null;
};
}, [enabled, handleInsert, reconnectTrigger]);
/**
* Setup UPDATE subscription
*/
useEffect(() => {
if (!enabled) return;
logger.log('📡 Setting up UPDATE subscription');
const channel = supabase
.channel('moderation-updated-submissions')
.on(
'postgres_changes',
{
event: 'UPDATE',
schema: 'public',
table: 'content_submissions',
},
handleUpdate
)
.subscribe((status) => {
logger.log('UPDATE subscription status:', status);
if (status === 'SUBSCRIBED') {
setChannelStatus('connected');
} else if (status === 'CHANNEL_ERROR') {
setChannelStatus('error');
}
});
updateChannelRef.current = channel;
return () => {
logger.log('🛑 Cleaning up UPDATE subscription');
supabase.removeChannel(channel);
updateChannelRef.current = null;
};
}, [enabled, handleUpdate, reconnectTrigger]);
/**
* Cleanup debounce timers on unmount
*/
useEffect(() => {
return () => {
updateDebounceMap.current.forEach(timeout => clearTimeout(timeout));
updateDebounceMap.current.clear();
};
}, []);
/**
* Manual reconnect function
*/
const reconnect = useCallback(() => {
logger.log('🔄 Manually reconnecting subscriptions...');
setReconnectTrigger(prev => prev + 1);
}, []);
return {
isConnected: channelStatus === 'connected',
channelStatus,
reconnect,
};
}