from typing import Optional, List, Dict, Any, Tuple, Type, TypeVar, cast from django.db import transaction from django.core.exceptions import ValidationError from django.utils import timezone from django.contrib.auth import get_user_model from django.contrib.contenttypes.models import ContentType from django.contrib.auth.models import AbstractUser from collections import Counter import json from .models import VersionBranch, VersionTag, ChangeSet UserModel = TypeVar('UserModel', bound=AbstractUser) User = cast(Type[UserModel], get_user_model()) class BranchManager: """Manages version control branch operations""" @transaction.atomic def create_branch( self, name: str, parent: Optional[VersionBranch] = None, user: Optional[UserModel] = None ) -> VersionBranch: """Create a new version branch""" branch = VersionBranch.objects.create( name=name, parent=parent, created_by=user, metadata={ 'created_from': parent.name if parent else 'root', 'created_at': timezone.now().isoformat() } ) branch.full_clean() return branch @transaction.atomic def merge_branches( self, source: VersionBranch, target: VersionBranch, user: Optional[UserModel] = None ) -> Tuple[bool, List[Dict[str, Any]]]: """ Merge source branch into target branch Returns: (success, conflicts) """ if not source.is_active or not target.is_active: raise ValidationError("Cannot merge inactive branches") merger = MergeStrategy() success, conflicts = merger.auto_merge(source, target) if success: # Record successful merge ChangeSet.objects.create( branch=target, created_by=user, description=f"Merged branch '{source.name}' into '{target.name}'", metadata={ 'merge_source': source.name, 'merge_target': target.name, 'merged_at': timezone.now().isoformat() }, status='applied' ) return success, conflicts def list_branches(self, include_inactive: bool = False) -> List[VersionBranch]: """Get all branches with their relationships""" queryset = VersionBranch.objects.select_related('parent') if not include_inactive: queryset = queryset.filter(is_active=True) return list(queryset) @transaction.atomic def acquire_lock( self, branch: VersionBranch, user: UserModel, duration: int = 48, reason: str = "" ) -> bool: """ Acquire a lock on a branch Args: branch: The branch to lock user: User acquiring the lock duration: Lock duration in hours (default 48) reason: Reason for locking Returns: bool: True if lock acquired, False if already locked """ # Check if branch is already locked if branch.lock_status: expires = timezone.datetime.fromisoformat(branch.lock_status['expires']) if timezone.now() < expires: return False # Set lock expiry = timezone.now() + timezone.timedelta(hours=duration) branch.lock_status = { 'user': user.id, 'expires': expiry.isoformat(), 'reason': reason } # Record in history branch.lock_history.append({ 'user': user.id, 'action': 'lock', 'timestamp': timezone.now().isoformat(), 'reason': reason, 'expires': expiry.isoformat() }) branch.save() return True @transaction.atomic def release_lock( self, branch: VersionBranch, user: UserModel, force: bool = False ) -> bool: """ Release a lock on a branch Args: branch: The branch to unlock user: User releasing the lock force: Whether to force unlock (requires permissions) Returns: bool: True if lock released, False if not locked or unauthorized """ if not branch.lock_status: return False locked_by = branch.lock_status.get('user') if not locked_by: return False # Check authorization if not force and locked_by != user.id: if not user.has_perm('history_tracking.force_unlock_branch'): return False # Record in history branch.lock_history.append({ 'user': user.id, 'action': 'unlock', 'timestamp': timezone.now().isoformat(), 'forced': force }) # Clear lock branch.lock_status = {} branch.save() return True def check_lock(self, branch: VersionBranch) -> Dict[str, Any]: """ Check the lock status of a branch Args: branch: The branch to check Returns: dict: Lock status information """ if not branch.lock_status: return {'locked': False} expires = timezone.datetime.fromisoformat(branch.lock_status['expires']) if timezone.now() >= expires: # Lock has expired branch.lock_status = {} branch.save() return {'locked': False} return { 'locked': True, 'user': User.objects.get(id=branch.lock_status['user']), 'expires': expires, 'reason': branch.lock_status.get('reason', '') } def get_lock_history( self, branch: VersionBranch, limit: Optional[int] = None ) -> List[Dict[str, Any]]: """ Get the lock history for a branch Args: branch: The branch to get history for limit: Optional limit on number of entries Returns: list: Lock history entries """ history = branch.lock_history if limit: history = history[-limit:] # Enhance history with user objects for entry in history: try: entry['user_obj'] = User.objects.get(id=entry['user']) except User.DoesNotExist: entry['user_obj'] = None return history class ChangeTracker: """Tracks and manages changes across the system""" @transaction.atomic def record_change( self, instance: Any, change_type: str, branch: VersionBranch, user: Optional[UserModel] = None, metadata: Optional[Dict] = None ) -> ChangeSet: """Record a change in the system""" if not hasattr(instance, 'history'): raise ValueError("Instance must be a model with history tracking enabled") # Create historical record by saving the instance instance.save() historical_record = instance.history.first() if not historical_record: raise ValueError("Failed to create historical record") # Create changeset content_type = ContentType.objects.get_for_model(historical_record) changeset = ChangeSet.objects.create( branch=branch, created_by=user, description=f"{change_type} operation on {instance._meta.model_name}", metadata=metadata or {}, status='pending', content_type=content_type, object_id=historical_record.pk ) return changeset def get_changes(self, branch: VersionBranch) -> List[ChangeSet]: """Get all changes in a branch ordered by creation time""" return list(ChangeSet.objects.filter(branch=branch).order_by('created_at')) def compute_enhanced_diff( self, version1: Any, version2: Any, syntax_detect: bool = True ) -> Dict[str, Any]: """ Return structured diff with syntax metadata Args: version1: First version to compare version2: Second version to compare syntax_detect: Whether to detect syntax types Returns: Dict containing structured diff with metadata """ if not hasattr(version1, 'history') or not hasattr(version2, 'history'): raise ValueError("Both versions must be history-tracked models") # Get historical records v1_history = version1.history.first() v2_history = version2.history.first() if not (v1_history and v2_history): raise ValueError("No historical records found") changes = {} # Compare fields and detect syntax for field in v2_history._meta.fields: field_name = field.name if field_name in [ 'history_id', 'history_date', 'history_type', 'history_user_id', 'history_change_reason' ]: continue old_value = getattr(v1_history, field_name) new_value = getattr(v2_history, field_name) if old_value != new_value: field_type = field.get_internal_type() syntax_type = self._detect_syntax(field_type, old_value) if syntax_detect else 'text' changes[field_name] = { 'old': str(old_value), 'new': str(new_value), 'type': field_type, 'syntax': syntax_type, 'line_numbers': self._compute_line_numbers(old_value, new_value), 'metadata': { 'comment_anchor_id': f"{v2_history.history_id}_{field_name}", 'field_type': field_type, 'content_type': v2_history._meta.model_name } } # Calculate impact metrics impact_metrics = self._calculate_impact_metrics(changes) return { 'changes': changes, 'metadata': { 'version1_id': v1_history.history_id, 'version2_id': v2_history.history_id, 'timestamp': timezone.now().isoformat(), 'impact_score': impact_metrics['impact_score'], 'stats': impact_metrics['stats'], 'performance': { 'syntax_detection': syntax_detect, 'computed_at': timezone.now().isoformat() } } } def _detect_syntax(self, field_type: str, value: Any) -> str: """ Detect syntax type for field content Args: field_type: Django field type value: Field value Returns: Detected syntax type """ if field_type in ['TextField', 'CharField']: # Try to detect if it's code if isinstance(value, str): if value.startswith('def ') or value.startswith('class '): return 'python' if value.startswith('{') or value.startswith('['): try: json.loads(value) return 'json' except: pass if value.startswith(' Dict[str, List[int]]: """ Compute line numbers for diff navigation Args: old_value: Previous value new_value: New value Returns: Dict with old and new line numbers """ def count_lines(value): if not isinstance(value, str): value = str(value) return value.count('\n') + 1 old_lines = count_lines(old_value) new_lines = count_lines(new_value) return { 'old': list(range(1, old_lines + 1)), 'new': list(range(1, new_lines + 1)) } def _calculate_impact_metrics(self, changes: Dict[str, Any]) -> Dict[str, Any]: """ Calculate impact metrics for changes Args: changes: Dict of changes Returns: Dict with impact metrics """ total_lines_changed = sum( len(c['line_numbers']['old']) + len(c['line_numbers']['new']) for c in changes.values() ) field_types = Counter(c['type'] for c in changes.values()) syntax_types = Counter(c['syntax'] for c in changes.values()) # Calculate impact score (0-1) impact_weights = { 'lines_changed': 0.4, 'fields_changed': 0.3, 'complexity': 0.3 } # Normalize metrics normalized_lines = min(1.0, total_lines_changed / 1000) # Cap at 1000 lines normalized_fields = min(1.0, len(changes) / 20) # Cap at 20 fields # Complexity based on field and syntax types complexity_score = ( len(field_types) / 10 + # Variety of field types len(syntax_types) / 5 + # Variety of syntax types (field_types.get('JSONField', 0) * 0.2) + # Weight complex fields higher (syntax_types.get('python', 0) * 0.2) # Weight code changes higher ) / 2 # Normalize to 0-1 impact_score = ( impact_weights['lines_changed'] * normalized_lines + impact_weights['fields_changed'] * normalized_fields + impact_weights['complexity'] * complexity_score ) return { 'impact_score': impact_score, 'stats': { 'total_lines_changed': total_lines_changed, 'fields_changed': len(changes), 'field_types': dict(field_types), 'syntax_types': dict(syntax_types) } } class MergeStrategy: """Handles merge operations and conflict resolution""" def auto_merge( self, source: VersionBranch, target: VersionBranch ) -> Tuple[bool, List[Dict[str, Any]]]: """ Attempt automatic merge between branches Returns: (success, conflicts) """ conflicts = [] # Get all changes since branch creation source_changes = ChangeSet.objects.filter( branch=source, status='applied' ).order_by('created_at') target_changes = ChangeSet.objects.filter( branch=target, status='applied' ).order_by('created_at') # Detect conflicts for source_change in source_changes: for target_change in target_changes: if self._detect_conflict(source_change, target_change): conflicts.append({ 'source_change': source_change.pk, 'target_change': target_change.pk, 'type': 'content_conflict', 'description': 'Conflicting changes detected' }) if conflicts: return False, conflicts # No conflicts, apply source changes to target for change in source_changes: self._apply_change_to_branch(change, target) return True, [] def _detect_conflict(self, change1: ChangeSet, change2: ChangeSet) -> bool: """Check if two changes conflict with each other""" # Get historical instances instance1 = change1.historical_instance instance2 = change2.historical_instance if not (instance1 and instance2): return False # Same model and instance ID indicates potential conflict return ( instance1._meta.model == instance2._meta.model and instance1.id == instance2.id ) @transaction.atomic def _apply_change_to_branch( self, change: ChangeSet, target_branch: VersionBranch ) -> None: """Apply a change from one branch to another""" # Create new changeset in target branch new_changeset = ChangeSet.objects.create( branch=target_branch, description=f"Applied change from '{change.branch.name}'", metadata={ 'source_change': change.pk, 'source_branch': change.branch.name }, status='pending', content_type=change.content_type, object_id=change.object_id ) new_changeset.status = 'applied' new_changeset.save()