mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-20 16:11:08 -05:00
198 lines
6.7 KiB
Python
198 lines
6.7 KiB
Python
from typing import Optional, List, Dict, Any, Tuple, Type, TypeVar, cast
|
|
from django.db import transaction
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils import timezone
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib.auth.models import AbstractUser
|
|
from .models import VersionBranch, VersionTag, ChangeSet
|
|
|
|
UserModel = TypeVar('UserModel', bound=AbstractUser)
|
|
User = cast(Type[UserModel], get_user_model())
|
|
|
|
class BranchManager:
|
|
"""Manages version control branch operations"""
|
|
|
|
@transaction.atomic
|
|
def create_branch(
|
|
self,
|
|
name: str,
|
|
parent: Optional[VersionBranch] = None,
|
|
user: Optional[UserModel] = None
|
|
) -> VersionBranch:
|
|
"""Create a new version branch"""
|
|
branch = VersionBranch.objects.create(
|
|
name=name,
|
|
parent=parent,
|
|
created_by=user,
|
|
metadata={
|
|
'created_from': parent.name if parent else 'root',
|
|
'created_at': timezone.now().isoformat()
|
|
}
|
|
)
|
|
branch.full_clean()
|
|
return branch
|
|
|
|
@transaction.atomic
|
|
def merge_branches(
|
|
self,
|
|
source: VersionBranch,
|
|
target: VersionBranch,
|
|
user: Optional[UserModel] = None
|
|
) -> Tuple[bool, List[Dict[str, Any]]]:
|
|
"""
|
|
Merge source branch into target branch
|
|
Returns: (success, conflicts)
|
|
"""
|
|
if not source.is_active or not target.is_active:
|
|
raise ValidationError("Cannot merge inactive branches")
|
|
|
|
merger = MergeStrategy()
|
|
success, conflicts = merger.auto_merge(source, target)
|
|
|
|
if success:
|
|
# Record successful merge
|
|
ChangeSet.objects.create(
|
|
branch=target,
|
|
created_by=user,
|
|
description=f"Merged branch '{source.name}' into '{target.name}'",
|
|
metadata={
|
|
'merge_source': source.name,
|
|
'merge_target': target.name,
|
|
'merged_at': timezone.now().isoformat()
|
|
},
|
|
status='applied'
|
|
)
|
|
|
|
return success, conflicts
|
|
|
|
def list_branches(self, include_inactive: bool = False) -> List[VersionBranch]:
|
|
"""Get all branches with their relationships"""
|
|
queryset = VersionBranch.objects.select_related('parent')
|
|
if not include_inactive:
|
|
queryset = queryset.filter(is_active=True)
|
|
return list(queryset)
|
|
|
|
class ChangeTracker:
|
|
"""Tracks and manages changes across the system"""
|
|
|
|
@transaction.atomic
|
|
def record_change(
|
|
self,
|
|
instance: Any,
|
|
change_type: str,
|
|
branch: VersionBranch,
|
|
user: Optional[UserModel] = None,
|
|
metadata: Optional[Dict] = None
|
|
) -> ChangeSet:
|
|
"""Record a change in the system"""
|
|
if not hasattr(instance, 'history'):
|
|
raise ValueError("Instance must be a model with history tracking enabled")
|
|
|
|
# Create historical record by saving the instance
|
|
instance.save()
|
|
historical_record = instance.history.first()
|
|
|
|
if not historical_record:
|
|
raise ValueError("Failed to create historical record")
|
|
|
|
# Create changeset
|
|
content_type = ContentType.objects.get_for_model(historical_record)
|
|
changeset = ChangeSet.objects.create(
|
|
branch=branch,
|
|
created_by=user,
|
|
description=f"{change_type} operation on {instance._meta.model_name}",
|
|
metadata=metadata or {},
|
|
status='pending',
|
|
content_type=content_type,
|
|
object_id=historical_record.pk
|
|
)
|
|
|
|
return changeset
|
|
|
|
def get_changes(self, branch: VersionBranch) -> List[ChangeSet]:
|
|
"""Get all changes in a branch ordered by creation time"""
|
|
return list(ChangeSet.objects.filter(branch=branch).order_by('created_at'))
|
|
|
|
class MergeStrategy:
|
|
"""Handles merge operations and conflict resolution"""
|
|
|
|
def auto_merge(
|
|
self,
|
|
source: VersionBranch,
|
|
target: VersionBranch
|
|
) -> Tuple[bool, List[Dict[str, Any]]]:
|
|
"""
|
|
Attempt automatic merge between branches
|
|
Returns: (success, conflicts)
|
|
"""
|
|
conflicts = []
|
|
|
|
# Get all changes since branch creation
|
|
source_changes = ChangeSet.objects.filter(
|
|
branch=source,
|
|
status='applied'
|
|
).order_by('created_at')
|
|
|
|
target_changes = ChangeSet.objects.filter(
|
|
branch=target,
|
|
status='applied'
|
|
).order_by('created_at')
|
|
|
|
# Detect conflicts
|
|
for source_change in source_changes:
|
|
for target_change in target_changes:
|
|
if self._detect_conflict(source_change, target_change):
|
|
conflicts.append({
|
|
'source_change': source_change.pk,
|
|
'target_change': target_change.pk,
|
|
'type': 'content_conflict',
|
|
'description': 'Conflicting changes detected'
|
|
})
|
|
|
|
if conflicts:
|
|
return False, conflicts
|
|
|
|
# No conflicts, apply source changes to target
|
|
for change in source_changes:
|
|
self._apply_change_to_branch(change, target)
|
|
|
|
return True, []
|
|
|
|
def _detect_conflict(self, change1: ChangeSet, change2: ChangeSet) -> bool:
|
|
"""Check if two changes conflict with each other"""
|
|
# Get historical instances
|
|
instance1 = change1.historical_instance
|
|
instance2 = change2.historical_instance
|
|
|
|
if not (instance1 and instance2):
|
|
return False
|
|
|
|
# Same model and instance ID indicates potential conflict
|
|
return (
|
|
instance1._meta.model == instance2._meta.model and
|
|
instance1.id == instance2.id
|
|
)
|
|
|
|
@transaction.atomic
|
|
def _apply_change_to_branch(
|
|
self,
|
|
change: ChangeSet,
|
|
target_branch: VersionBranch
|
|
) -> None:
|
|
"""Apply a change from one branch to another"""
|
|
# Create new changeset in target branch
|
|
new_changeset = ChangeSet.objects.create(
|
|
branch=target_branch,
|
|
description=f"Applied change from '{change.branch.name}'",
|
|
metadata={
|
|
'source_change': change.pk,
|
|
'source_branch': change.branch.name
|
|
},
|
|
status='pending',
|
|
content_type=change.content_type,
|
|
object_id=change.object_id
|
|
)
|
|
|
|
new_changeset.status = 'applied'
|
|
new_changeset.save() |