from typing import Optional, List, Dict, Any, Tuple from django.db import transaction from django.core.exceptions import ValidationError from django.utils import timezone from django.contrib.auth import get_user_model from django.contrib.contenttypes.models import ContentType from .models import VersionBranch, VersionTag, ChangeSet User = get_user_model() class BranchManager: """Manages version control branch operations""" @transaction.atomic def create_branch(self, name: str, parent: Optional[VersionBranch] = None, user: Optional[User] = None) -> VersionBranch: """Create a new version branch""" branch = VersionBranch.objects.create( name=name, parent=parent, created_by=user, metadata={ 'created_from': parent.name if parent else 'root', 'created_at': timezone.now().isoformat() } ) branch.full_clean() return branch @transaction.atomic def merge_branches(self, source: VersionBranch, target: VersionBranch, user: Optional[User] = None) -> Tuple[bool, List[Dict[str, Any]]]: """ Merge source branch into target branch Returns: (success, conflicts) """ if not source.is_active or not target.is_active: raise ValidationError("Cannot merge inactive branches") merger = MergeStrategy() success, conflicts = merger.auto_merge(source, target) if success: # Record successful merge ChangeSet.objects.create( branch=target, created_by=user, description=f"Merged branch '{source.name}' into '{target.name}'", metadata={ 'merge_source': source.name, 'merge_target': target.name, 'merged_at': timezone.now().isoformat() }, status='applied' ) return success, conflicts def list_branches(self, include_inactive: bool = False) -> List[VersionBranch]: """Get all branches with their relationships""" queryset = VersionBranch.objects.select_related('parent') if not include_inactive: queryset = queryset.filter(is_active=True) return list(queryset) class ChangeTracker: """Tracks and manages changes across the system""" @transaction.atomic def record_change(self, instance: Any, change_type: str, branch: VersionBranch, user: Optional[User] = None, metadata: Optional[Dict] = None) -> ChangeSet: """Record a change in the system""" if not hasattr(instance, 'history'): raise ValueError("Instance must be a model with history tracking enabled") # Create historical record by saving the instance instance.save() historical_record = instance.history.first() if not historical_record: raise ValueError("Failed to create historical record") # Create changeset content_type = ContentType.objects.get_for_model(historical_record) changeset = ChangeSet.objects.create( branch=branch, created_by=user, description=f"{change_type} operation on {instance._meta.model_name}", metadata=metadata or {}, status='pending', content_type=content_type, object_id=historical_record.pk ) return changeset def get_changes(self, branch: VersionBranch) -> List[ChangeSet]: """Get all changes in a branch ordered by creation time""" return list(ChangeSet.objects.filter(branch=branch).order_by('created_at')) class MergeStrategy: """Handles merge operations and conflict resolution""" def auto_merge(self, source: VersionBranch, target: VersionBranch) -> Tuple[bool, List[Dict[str, Any]]]: """ Attempt automatic merge between branches Returns: (success, conflicts) """ conflicts = [] # Get all changes since branch creation source_changes = ChangeSet.objects.filter( branch=source, status='applied' ).order_by('created_at') target_changes = ChangeSet.objects.filter( branch=target, status='applied' ).order_by('created_at') # Detect conflicts for source_change in source_changes: for target_change in target_changes: if self._detect_conflict(source_change, target_change): conflicts.append({ 'source_change': source_change.pk, 'target_change': target_change.pk, 'type': 'content_conflict', 'description': 'Conflicting changes detected' }) if conflicts: return False, conflicts # No conflicts, apply source changes to target for change in source_changes: self._apply_change_to_branch(change, target) return True, [] def _detect_conflict(self, change1: ChangeSet, change2: ChangeSet) -> bool: """Check if two changes conflict with each other""" # Get historical instances instance1 = change1.historical_instance instance2 = change2.historical_instance if not (instance1 and instance2): return False # Same model and instance ID indicates potential conflict return ( instance1._meta.model == instance2._meta.model and instance1.id == instance2.id ) @transaction.atomic def _apply_change_to_branch(self, change: ChangeSet, target_branch: VersionBranch) -> None: """Apply a change from one branch to another""" # Create new changeset in target branch new_changeset = ChangeSet.objects.create( branch=target_branch, description=f"Applied change from '{change.branch.name}'", metadata={ 'source_change': change.pk, 'source_branch': change.branch.name }, status='pending', content_type=change.content_type, object_id=change.object_id ) new_changeset.status = 'applied' new_changeset.save()