Files
thrillwiki_django_no_react/history_tracking/managers.py

519 lines
17 KiB
Python

from typing import Optional, List, Dict, Any, Tuple, Type, TypeVar, cast
from django.db import transaction
from django.core.exceptions import ValidationError
from django.utils import timezone
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.models import AbstractUser
from collections import Counter
import json
from .models import VersionBranch, VersionTag, ChangeSet
UserModel = TypeVar('UserModel', bound=AbstractUser)
User = cast(Type[UserModel], get_user_model())
class BranchManager:
"""Manages version control branch operations"""
@transaction.atomic
def create_branch(
self,
name: str,
parent: Optional[VersionBranch] = None,
user: Optional[UserModel] = None
) -> VersionBranch:
"""Create a new version branch"""
branch = VersionBranch.objects.create(
name=name,
parent=parent,
created_by=user,
metadata={
'created_from': parent.name if parent else 'root',
'created_at': timezone.now().isoformat()
}
)
branch.full_clean()
return branch
@transaction.atomic
def merge_branches(
self,
source: VersionBranch,
target: VersionBranch,
user: Optional[UserModel] = None
) -> Tuple[bool, List[Dict[str, Any]]]:
"""
Merge source branch into target branch
Returns: (success, conflicts)
"""
if not source.is_active or not target.is_active:
raise ValidationError("Cannot merge inactive branches")
merger = MergeStrategy()
success, conflicts = merger.auto_merge(source, target)
if success:
# Record successful merge
ChangeSet.objects.create(
branch=target,
created_by=user,
description=f"Merged branch '{source.name}' into '{target.name}'",
metadata={
'merge_source': source.name,
'merge_target': target.name,
'merged_at': timezone.now().isoformat()
},
status='applied'
)
return success, conflicts
def list_branches(self, include_inactive: bool = False) -> List[VersionBranch]:
"""Get all branches with their relationships"""
queryset = VersionBranch.objects.select_related('parent')
if not include_inactive:
queryset = queryset.filter(is_active=True)
return list(queryset)
@transaction.atomic
def acquire_lock(
self,
branch: VersionBranch,
user: UserModel,
duration: int = 48,
reason: str = ""
) -> bool:
"""
Acquire a lock on a branch
Args:
branch: The branch to lock
user: User acquiring the lock
duration: Lock duration in hours (default 48)
reason: Reason for locking
Returns:
bool: True if lock acquired, False if already locked
"""
# Check if branch is already locked
if branch.lock_status:
expires = timezone.datetime.fromisoformat(branch.lock_status['expires'])
if timezone.now() < expires:
return False
# Set lock
expiry = timezone.now() + timezone.timedelta(hours=duration)
branch.lock_status = {
'user': user.id,
'expires': expiry.isoformat(),
'reason': reason
}
# Record in history
branch.lock_history.append({
'user': user.id,
'action': 'lock',
'timestamp': timezone.now().isoformat(),
'reason': reason,
'expires': expiry.isoformat()
})
branch.save()
return True
@transaction.atomic
def release_lock(
self,
branch: VersionBranch,
user: UserModel,
force: bool = False
) -> bool:
"""
Release a lock on a branch
Args:
branch: The branch to unlock
user: User releasing the lock
force: Whether to force unlock (requires permissions)
Returns:
bool: True if lock released, False if not locked or unauthorized
"""
if not branch.lock_status:
return False
locked_by = branch.lock_status.get('user')
if not locked_by:
return False
# Check authorization
if not force and locked_by != user.id:
if not user.has_perm('history_tracking.force_unlock_branch'):
return False
# Record in history
branch.lock_history.append({
'user': user.id,
'action': 'unlock',
'timestamp': timezone.now().isoformat(),
'forced': force
})
# Clear lock
branch.lock_status = {}
branch.save()
return True
def check_lock(self, branch: VersionBranch) -> Dict[str, Any]:
"""
Check the lock status of a branch
Args:
branch: The branch to check
Returns:
dict: Lock status information
"""
if not branch.lock_status:
return {'locked': False}
expires = timezone.datetime.fromisoformat(branch.lock_status['expires'])
if timezone.now() >= expires:
# Lock has expired
branch.lock_status = {}
branch.save()
return {'locked': False}
return {
'locked': True,
'user': User.objects.get(id=branch.lock_status['user']),
'expires': expires,
'reason': branch.lock_status.get('reason', '')
}
def get_lock_history(
self,
branch: VersionBranch,
limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Get the lock history for a branch
Args:
branch: The branch to get history for
limit: Optional limit on number of entries
Returns:
list: Lock history entries
"""
history = branch.lock_history
if limit:
history = history[-limit:]
# Enhance history with user objects
for entry in history:
try:
entry['user_obj'] = User.objects.get(id=entry['user'])
except User.DoesNotExist:
entry['user_obj'] = None
return history
class ChangeTracker:
"""Tracks and manages changes across the system"""
@transaction.atomic
def record_change(
self,
instance: Any,
change_type: str,
branch: VersionBranch,
user: Optional[UserModel] = None,
metadata: Optional[Dict] = None
) -> ChangeSet:
"""Record a change in the system"""
if not hasattr(instance, 'history'):
raise ValueError("Instance must be a model with history tracking enabled")
# Create historical record by saving the instance
instance.save()
historical_record = instance.history.first()
if not historical_record:
raise ValueError("Failed to create historical record")
# Create changeset
content_type = ContentType.objects.get_for_model(historical_record)
changeset = ChangeSet.objects.create(
branch=branch,
created_by=user,
description=f"{change_type} operation on {instance._meta.model_name}",
metadata=metadata or {},
status='pending',
content_type=content_type,
object_id=historical_record.pk
)
return changeset
def get_changes(self, branch: VersionBranch) -> List[ChangeSet]:
"""Get all changes in a branch ordered by creation time"""
return list(ChangeSet.objects.filter(branch=branch).order_by('created_at'))
def compute_enhanced_diff(
self,
version1: Any,
version2: Any,
syntax_detect: bool = True
) -> Dict[str, Any]:
"""
Return structured diff with syntax metadata
Args:
version1: First version to compare
version2: Second version to compare
syntax_detect: Whether to detect syntax types
Returns:
Dict containing structured diff with metadata
"""
if not hasattr(version1, 'history') or not hasattr(version2, 'history'):
raise ValueError("Both versions must be history-tracked models")
# Get historical records
v1_history = version1.history.first()
v2_history = version2.history.first()
if not (v1_history and v2_history):
raise ValueError("No historical records found")
changes = {}
# Compare fields and detect syntax
for field in v2_history._meta.fields:
field_name = field.name
if field_name in [
'history_id', 'history_date', 'history_type',
'history_user_id', 'history_change_reason'
]:
continue
old_value = getattr(v1_history, field_name)
new_value = getattr(v2_history, field_name)
if old_value != new_value:
field_type = field.get_internal_type()
syntax_type = self._detect_syntax(field_type, old_value) if syntax_detect else 'text'
changes[field_name] = {
'old': str(old_value),
'new': str(new_value),
'type': field_type,
'syntax': syntax_type,
'line_numbers': self._compute_line_numbers(old_value, new_value),
'metadata': {
'comment_anchor_id': f"{v2_history.history_id}_{field_name}",
'field_type': field_type,
'content_type': v2_history._meta.model_name
}
}
# Calculate impact metrics
impact_metrics = self._calculate_impact_metrics(changes)
return {
'changes': changes,
'metadata': {
'version1_id': v1_history.history_id,
'version2_id': v2_history.history_id,
'timestamp': timezone.now().isoformat(),
'impact_score': impact_metrics['impact_score'],
'stats': impact_metrics['stats'],
'performance': {
'syntax_detection': syntax_detect,
'computed_at': timezone.now().isoformat()
}
}
}
def _detect_syntax(self, field_type: str, value: Any) -> str:
"""
Detect syntax type for field content
Args:
field_type: Django field type
value: Field value
Returns:
Detected syntax type
"""
if field_type in ['TextField', 'CharField']:
# Try to detect if it's code
if isinstance(value, str):
if value.startswith('def ') or value.startswith('class '):
return 'python'
if value.startswith('{') or value.startswith('['):
try:
json.loads(value)
return 'json'
except:
pass
if value.startswith('<!DOCTYPE') or value.startswith('<html'):
return 'html'
if value.startswith('// ') or value.startswith('function '):
return 'javascript'
syntax_map = {
'JSONField': 'json',
'FileField': 'path',
'FilePathField': 'path',
'URLField': 'url',
'EmailField': 'email',
'TextField': 'text',
'CharField': 'text'
}
return syntax_map.get(field_type, 'text')
def _compute_line_numbers(self, old_value: Any, new_value: Any) -> Dict[str, List[int]]:
"""
Compute line numbers for diff navigation
Args:
old_value: Previous value
new_value: New value
Returns:
Dict with old and new line numbers
"""
def count_lines(value):
if not isinstance(value, str):
value = str(value)
return value.count('\n') + 1
old_lines = count_lines(old_value)
new_lines = count_lines(new_value)
return {
'old': list(range(1, old_lines + 1)),
'new': list(range(1, new_lines + 1))
}
def _calculate_impact_metrics(self, changes: Dict[str, Any]) -> Dict[str, Any]:
"""
Calculate impact metrics for changes
Args:
changes: Dict of changes
Returns:
Dict with impact metrics
"""
total_lines_changed = sum(
len(c['line_numbers']['old']) + len(c['line_numbers']['new'])
for c in changes.values()
)
field_types = Counter(c['type'] for c in changes.values())
syntax_types = Counter(c['syntax'] for c in changes.values())
# Calculate impact score (0-1)
impact_weights = {
'lines_changed': 0.4,
'fields_changed': 0.3,
'complexity': 0.3
}
# Normalize metrics
normalized_lines = min(1.0, total_lines_changed / 1000) # Cap at 1000 lines
normalized_fields = min(1.0, len(changes) / 20) # Cap at 20 fields
# Complexity based on field and syntax types
complexity_score = (
len(field_types) / 10 + # Variety of field types
len(syntax_types) / 5 + # Variety of syntax types
(field_types.get('JSONField', 0) * 0.2) + # Weight complex fields higher
(syntax_types.get('python', 0) * 0.2) # Weight code changes higher
) / 2 # Normalize to 0-1
impact_score = (
impact_weights['lines_changed'] * normalized_lines +
impact_weights['fields_changed'] * normalized_fields +
impact_weights['complexity'] * complexity_score
)
return {
'impact_score': impact_score,
'stats': {
'total_lines_changed': total_lines_changed,
'fields_changed': len(changes),
'field_types': dict(field_types),
'syntax_types': dict(syntax_types)
}
}
class MergeStrategy:
"""Handles merge operations and conflict resolution"""
def auto_merge(
self,
source: VersionBranch,
target: VersionBranch
) -> Tuple[bool, List[Dict[str, Any]]]:
"""
Attempt automatic merge between branches
Returns: (success, conflicts)
"""
conflicts = []
# Get all changes since branch creation
source_changes = ChangeSet.objects.filter(
branch=source,
status='applied'
).order_by('created_at')
target_changes = ChangeSet.objects.filter(
branch=target,
status='applied'
).order_by('created_at')
# Detect conflicts
for source_change in source_changes:
for target_change in target_changes:
if self._detect_conflict(source_change, target_change):
conflicts.append({
'source_change': source_change.pk,
'target_change': target_change.pk,
'type': 'content_conflict',
'description': 'Conflicting changes detected'
})
if conflicts:
return False, conflicts
# No conflicts, apply source changes to target
for change in source_changes:
self._apply_change_to_branch(change, target)
return True, []
def _detect_conflict(self, change1: ChangeSet, change2: ChangeSet) -> bool:
"""Check if two changes conflict with each other"""
# Get historical instances
instance1 = change1.historical_instance
instance2 = change2.historical_instance
if not (instance1 and instance2):
return False
# Same model and instance ID indicates potential conflict
return (
instance1._meta.model == instance2._meta.model and
instance1.id == instance2.id
)
@transaction.atomic
def _apply_change_to_branch(
self,
change: ChangeSet,
target_branch: VersionBranch
) -> None:
"""Apply a change from one branch to another"""
# Create new changeset in target branch
new_changeset = ChangeSet.objects.create(
branch=target_branch,
description=f"Applied change from '{change.branch.name}'",
metadata={
'source_change': change.pk,
'source_branch': change.branch.name
},
status='pending',
content_type=change.content_type,
object_id=change.object_id
)
new_changeset.status = 'applied'
new_changeset.save()