Files
thrilltrack-explorer/django-backend/api/v1/services/history_service.py

630 lines
22 KiB
Python

"""
History service for pghistory Event models.
Provides business logic for history queries, comparisons, and rollbacks
using pghistory Event models (CompanyEvent, ParkEvent, RideEvent, etc.).
"""
from datetime import timedelta, date, datetime
from typing import Optional, List, Dict, Any, Tuple
from django.utils import timezone
from django.db.models import QuerySet, Q
from django.core.exceptions import PermissionDenied
class HistoryService:
"""
Service for managing entity history via pghistory Event models.
Provides:
- History queries with role-based access control
- Event comparisons and diffs
- Rollback functionality
- Field-specific history tracking
"""
# Mapping of entity types to their pghistory Event model paths
EVENT_MODELS = {
'park': ('apps.entities.models', 'ParkEvent'),
'ride': ('apps.entities.models', 'RideEvent'),
'company': ('apps.entities.models', 'CompanyEvent'),
'ridemodel': ('apps.entities.models', 'RideModelEvent'),
'review': ('apps.reviews.models', 'ReviewEvent'),
}
# Mapping of entity types to their main model paths
ENTITY_MODELS = {
'park': ('apps.entities.models', 'Park'),
'ride': ('apps.entities.models', 'Ride'),
'company': ('apps.entities.models', 'Company'),
'ridemodel': ('apps.entities.models', 'RideModel'),
'review': ('apps.reviews.models', 'Review'),
}
@classmethod
def get_event_model(cls, entity_type: str):
"""
Get the pghistory Event model class for an entity type.
Args:
entity_type: Type of entity ('park', 'ride', 'company', 'ridemodel', 'review')
Returns:
Event model class (e.g., ParkEvent)
Raises:
ValueError: If entity type is unknown
"""
entity_type_lower = entity_type.lower()
if entity_type_lower not in cls.EVENT_MODELS:
raise ValueError(f"Unknown entity type: {entity_type}")
module_path, class_name = cls.EVENT_MODELS[entity_type_lower]
module = __import__(module_path, fromlist=[class_name])
return getattr(module, class_name)
@classmethod
def get_entity_model(cls, entity_type: str):
"""Get the main entity model class for an entity type."""
entity_type_lower = entity_type.lower()
if entity_type_lower not in cls.ENTITY_MODELS:
raise ValueError(f"Unknown entity type: {entity_type}")
module_path, class_name = cls.ENTITY_MODELS[entity_type_lower]
module = __import__(module_path, fromlist=[class_name])
return getattr(module, class_name)
@classmethod
def get_history(
cls,
entity_type: str,
entity_id: str,
user=None,
operation: Optional[str] = None,
date_from: Optional[date] = None,
date_to: Optional[date] = None,
field_changed: Optional[str] = None,
limit: int = 50,
offset: int = 0
) -> Tuple[QuerySet, int]:
"""
Get history for an entity with filtering and access control.
Args:
entity_type: Type of entity
entity_id: UUID of the entity
user: User making the request (for access control)
operation: Filter by operation type ('INSERT' or 'UPDATE')
date_from: Filter events after this date
date_to: Filter events before this date
field_changed: Filter events that changed this field (requires comparison)
limit: Maximum number of events to return
offset: Number of events to skip (for pagination)
Returns:
Tuple of (queryset, total_count)
"""
EventModel = cls.get_event_model(entity_type)
# Base queryset for this entity
queryset = EventModel.objects.filter(
pgh_obj_id=entity_id
).order_by('-pgh_created_at')
# Get total count before access control for informational purposes
total_count = queryset.count()
# Apply access control (time-based filtering)
queryset = cls._apply_access_control(queryset, user)
accessible_count = queryset.count()
# Apply additional filters
if date_from:
queryset = queryset.filter(pgh_created_at__gte=date_from)
if date_to:
queryset = queryset.filter(pgh_created_at__lte=date_to)
# Note: field_changed filtering requires comparing consecutive events
# This is expensive and should be done in the API layer if needed
return queryset[offset:offset + limit], accessible_count
@classmethod
def _apply_access_control(cls, queryset: QuerySet, user) -> QuerySet:
"""
Apply time-based access control based on user role.
Access Rules:
- Unauthenticated: Last 30 days
- Authenticated: Last 1 year
- Moderators/Admins/Superusers: Unlimited
Args:
queryset: Base queryset to filter
user: User making the request
Returns:
Filtered queryset
"""
# Check for privileged users first
if user and user.is_authenticated:
# Superusers and staff get unlimited access
if user.is_superuser or user.is_staff:
return queryset
# Check for moderator/admin role if role system exists
if hasattr(user, 'role') and user.role in ['moderator', 'admin']:
return queryset
# Regular authenticated users: 1 year
cutoff = timezone.now() - timedelta(days=365)
return queryset.filter(pgh_created_at__gte=cutoff)
# Unauthenticated users: 30 days
cutoff = timezone.now() - timedelta(days=30)
return queryset.filter(pgh_created_at__gte=cutoff)
@classmethod
def get_access_reason(cls, user) -> str:
"""Get human-readable description of access level."""
if user and user.is_authenticated:
if user.is_superuser or user.is_staff:
return "Full access (administrator)"
if hasattr(user, 'role') and user.role in ['moderator', 'admin']:
return "Full access (moderator)"
return "Limited to last 1 year (authenticated user)"
return "Limited to last 30 days (public access)"
@classmethod
def is_access_limited(cls, user) -> bool:
"""Check if user has limited access."""
if not user or not user.is_authenticated:
return True
if user.is_superuser or user.is_staff:
return False
if hasattr(user, 'role') and user.role in ['moderator', 'admin']:
return False
return True
@classmethod
def get_event(
cls,
entity_type: str,
event_id: int,
user=None
) -> Optional[Any]:
"""
Get a specific event by ID with access control.
Args:
entity_type: Type of entity
event_id: ID of the event (pgh_id)
user: User making the request
Returns:
Event object or None if not found/not accessible
"""
EventModel = cls.get_event_model(entity_type)
try:
event = EventModel.objects.get(pgh_id=event_id)
# Check if user has access to this event based on timestamp
queryset = EventModel.objects.filter(pgh_id=event_id)
if not cls._apply_access_control(queryset, user).exists():
return None # User doesn't have access to this event
return event
except EventModel.DoesNotExist:
return None
@classmethod
def compare_events(
cls,
entity_type: str,
event_id1: int,
event_id2: int,
user=None
) -> Dict[str, Any]:
"""
Compare two historical events.
Args:
entity_type: Type of entity
event_id1: ID of first event
event_id2: ID of second event
user: User making the request
Returns:
Dictionary containing comparison results
Raises:
ValueError: If events not found or not accessible
"""
event1 = cls.get_event(entity_type, event_id1, user)
event2 = cls.get_event(entity_type, event_id2, user)
if not event1 or not event2:
raise ValueError("One or both events not found or not accessible")
# Ensure events are for the same entity
if event1.pgh_obj_id != event2.pgh_obj_id:
raise ValueError("Events must be for the same entity")
# Compute differences
differences = cls._compute_differences(event1, event2)
# Calculate time between events
time_delta = abs(event2.pgh_created_at - event1.pgh_created_at)
return {
'event1': event1,
'event2': event2,
'differences': differences,
'changed_field_count': len(differences),
'unchanged_field_count': cls._get_field_count(event1) - len(differences),
'time_between': cls._format_timedelta(time_delta)
}
@classmethod
def compare_with_current(
cls,
entity_type: str,
event_id: int,
entity,
user=None
) -> Dict[str, Any]:
"""
Compare historical event with current entity state.
Args:
entity_type: Type of entity
event_id: ID of historical event
entity: Current entity instance
user: User making the request
Returns:
Dictionary containing comparison results
Raises:
ValueError: If event not found or not accessible
"""
event = cls.get_event(entity_type, event_id, user)
if not event:
raise ValueError("Event not found or not accessible")
# Ensure event is for this entity
if str(event.pgh_obj_id) != str(entity.id):
raise ValueError("Event is not for the specified entity")
# Compute differences between historical and current
differences = {}
fields = cls._get_entity_fields(event)
for field in fields:
historical_val = getattr(event, field, None)
current_val = getattr(entity, field, None)
if historical_val != current_val:
differences[field] = {
'historical_value': cls._serialize_value(historical_val),
'current_value': cls._serialize_value(current_val),
'changed': True
}
# Calculate time since event
time_delta = timezone.now() - event.pgh_created_at
return {
'event': event,
'current_state': entity,
'differences': differences,
'changed_field_count': len(differences),
'time_since': cls._format_timedelta(time_delta)
}
@classmethod
def can_rollback(cls, user) -> bool:
"""Check if user has permission to perform rollbacks."""
if not user or not user.is_authenticated:
return False
if user.is_superuser or user.is_staff:
return True
if hasattr(user, 'role') and user.role in ['moderator', 'admin']:
return True
return False
@classmethod
def rollback_to_event(
cls,
entity,
entity_type: str,
event_id: int,
user,
fields: Optional[List[str]] = None,
comment: str = "",
create_backup: bool = True
) -> Dict[str, Any]:
"""
Rollback entity to a historical state.
IMPORTANT: This modifies the entity and saves it!
Args:
entity: Current entity instance
entity_type: Type of entity
event_id: ID of event to rollback to
user: User performing the rollback
fields: Optional list of specific fields to rollback (None = all fields)
comment: Optional comment explaining the rollback
create_backup: Whether to note the backup event ID
Returns:
Dictionary containing rollback results
Raises:
PermissionDenied: If user doesn't have rollback permission
ValueError: If event not found or invalid
"""
# Permission check
if not cls.can_rollback(user):
raise PermissionDenied("Only moderators and administrators can perform rollbacks")
event = cls.get_event(entity_type, event_id, user)
if not event:
raise ValueError("Event not found or not accessible")
# Ensure event is for this entity
if str(event.pgh_obj_id) != str(entity.id):
raise ValueError("Event is not for the specified entity")
# Track pre-rollback state for backup reference
backup_event_id = None
if create_backup:
# The current state will be captured automatically by pghistory
# when we save. We just need to note what the last event was.
EventModel = cls.get_event_model(entity_type)
last_event = EventModel.objects.filter(
pgh_obj_id=entity.id
).order_by('-pgh_created_at').first()
if last_event:
backup_event_id = last_event.pgh_id
# Determine which fields to rollback
if fields is None:
fields = cls._get_entity_fields(event)
# Track changes
changes = {}
for field in fields:
if hasattr(entity, field) and hasattr(event, field):
old_val = getattr(entity, field)
new_val = getattr(event, field)
if old_val != new_val:
setattr(entity, field, new_val)
changes[field] = {
'from': cls._serialize_value(old_val),
'to': cls._serialize_value(new_val)
}
# Save entity (pghistory will automatically create new event)
entity.save()
# Get the new event that was just created
EventModel = cls.get_event_model(entity_type)
new_event = EventModel.objects.filter(
pgh_obj_id=entity.id
).order_by('-pgh_created_at').first()
return {
'success': True,
'message': f'Successfully rolled back {len(changes)} field(s) to state from {event.pgh_created_at.strftime("%Y-%m-%d")}',
'entity_id': str(entity.id),
'rollback_event_id': event_id,
'new_event_id': new_event.pgh_id if new_event else None,
'fields_changed': changes,
'backup_event_id': backup_event_id
}
@classmethod
def get_field_history(
cls,
entity_type: str,
entity_id: str,
field_name: str,
user=None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get history of changes to a specific field.
Args:
entity_type: Type of entity
entity_id: UUID of the entity
field_name: Name of the field to track
user: User making the request
limit: Maximum number of changes to return
Returns:
List of field changes
"""
events, _ = cls.get_history(entity_type, entity_id, user, limit=limit)
field_history = []
previous_value = None
first_value = None
# Iterate through events in reverse chronological order
for event in events:
if not hasattr(event, field_name):
continue
current_value = getattr(event, field_name, None)
# Track first (oldest) value
if first_value is None:
first_value = current_value
# Detect changes
if previous_value is not None and current_value != previous_value:
field_history.append({
'timestamp': event.pgh_created_at,
'event_id': event.pgh_id,
'old_value': cls._serialize_value(previous_value),
'new_value': cls._serialize_value(current_value),
'change_type': 'UPDATE'
})
elif previous_value is None:
# First event we're seeing (most recent)
field_history.append({
'timestamp': event.pgh_created_at,
'event_id': event.pgh_id,
'old_value': None,
'new_value': cls._serialize_value(current_value),
'change_type': 'INSERT' if len(list(events)) == 1 else 'UPDATE'
})
previous_value = current_value
return {
'history': field_history,
'total_changes': len(field_history),
'first_value': cls._serialize_value(first_value),
'current_value': cls._serialize_value(previous_value) if previous_value is not None else None
}
@classmethod
def get_activity_summary(
cls,
entity_type: str,
entity_id: str,
user=None
) -> Dict[str, Any]:
"""
Get activity summary for an entity.
Args:
entity_type: Type of entity
entity_id: UUID of the entity
user: User making the request
Returns:
Dictionary with activity statistics
"""
EventModel = cls.get_event_model(entity_type)
now = timezone.now()
# Get all events for this entity (respecting access control)
all_events = EventModel.objects.filter(pgh_obj_id=entity_id)
total_events = all_events.count()
accessible_events = cls._apply_access_control(all_events, user)
accessible_count = accessible_events.count()
# Time-based summaries
last_24h = accessible_events.filter(
pgh_created_at__gte=now - timedelta(days=1)
).count()
last_7d = accessible_events.filter(
pgh_created_at__gte=now - timedelta(days=7)
).count()
last_30d = accessible_events.filter(
pgh_created_at__gte=now - timedelta(days=30)
).count()
last_year = accessible_events.filter(
pgh_created_at__gte=now - timedelta(days=365)
).count()
# Get recent activity (last 10 events)
recent_activity = accessible_events.order_by('-pgh_created_at')[:10]
return {
'total_events': total_events,
'accessible_events': accessible_count,
'summary': {
'last_24_hours': last_24h,
'last_7_days': last_7d,
'last_30_days': last_30d,
'last_year': last_year
},
'recent_activity': [
{
'timestamp': event.pgh_created_at,
'event_id': event.pgh_id,
'operation': 'INSERT' if event == accessible_events.last() else 'UPDATE'
}
for event in recent_activity
]
}
# Helper methods
@classmethod
def _compute_differences(cls, event1, event2) -> Dict[str, Any]:
"""Compute differences between two events."""
differences = {}
fields = cls._get_entity_fields(event1)
for field in fields:
val1 = getattr(event1, field, None)
val2 = getattr(event2, field, None)
if val1 != val2:
differences[field] = {
'event1_value': cls._serialize_value(val1),
'event2_value': cls._serialize_value(val2)
}
return differences
@classmethod
def _get_entity_fields(cls, event) -> List[str]:
"""Get list of entity field names (excluding pghistory fields)."""
return [
f.name for f in event._meta.fields
if not f.name.startswith('pgh_') and f.name not in ['id']
]
@classmethod
def _get_field_count(cls, event) -> int:
"""Get count of entity fields."""
return len(cls._get_entity_fields(event))
@classmethod
def _serialize_value(cls, value) -> Any:
"""Serialize a value for JSON response."""
if value is None:
return None
if isinstance(value, (datetime, date)):
return value.isoformat()
if hasattr(value, 'id'): # Foreign key
return str(value.id)
return value
@classmethod
def _format_timedelta(cls, delta: timedelta) -> str:
"""Format a timedelta as human-readable string."""
days = delta.days
if days == 0:
hours = delta.seconds // 3600
if hours == 0:
minutes = delta.seconds // 60
return f"{minutes} minute{'s' if minutes != 1 else ''}"
return f"{hours} hour{'s' if hours != 1 else ''}"
elif days < 30:
return f"{days} day{'s' if days != 1 else ''}"
elif days < 365:
months = days // 30
return f"{months} month{'s' if months != 1 else ''}"
else:
years = days // 365
months = (days % 365) // 30
if months > 0:
return f"{years} year{'s' if years != 1 else ''}, {months} month{'s' if months != 1 else ''}"
return f"{years} year{'s' if years != 1 else ''}"