Add comment and reply functionality with preview and notification templates

This commit is contained in:
pacnpal
2025-02-07 13:13:49 -05:00
parent c083f54afb
commit a07c882d42
30 changed files with 5153 additions and 383 deletions

View File

@@ -5,6 +5,8 @@ from django.utils import timezone
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.models import AbstractUser
from collections import Counter
import json
from .models import VersionBranch, VersionTag, ChangeSet
UserModel = TypeVar('UserModel', bound=AbstractUser)
@@ -73,6 +75,142 @@ class BranchManager:
queryset = queryset.filter(is_active=True)
return list(queryset)
@transaction.atomic
def acquire_lock(
self,
branch: VersionBranch,
user: UserModel,
duration: int = 48,
reason: str = ""
) -> bool:
"""
Acquire a lock on a branch
Args:
branch: The branch to lock
user: User acquiring the lock
duration: Lock duration in hours (default 48)
reason: Reason for locking
Returns:
bool: True if lock acquired, False if already locked
"""
# Check if branch is already locked
if branch.lock_status:
expires = timezone.datetime.fromisoformat(branch.lock_status['expires'])
if timezone.now() < expires:
return False
# Set lock
expiry = timezone.now() + timezone.timedelta(hours=duration)
branch.lock_status = {
'user': user.id,
'expires': expiry.isoformat(),
'reason': reason
}
# Record in history
branch.lock_history.append({
'user': user.id,
'action': 'lock',
'timestamp': timezone.now().isoformat(),
'reason': reason,
'expires': expiry.isoformat()
})
branch.save()
return True
@transaction.atomic
def release_lock(
self,
branch: VersionBranch,
user: UserModel,
force: bool = False
) -> bool:
"""
Release a lock on a branch
Args:
branch: The branch to unlock
user: User releasing the lock
force: Whether to force unlock (requires permissions)
Returns:
bool: True if lock released, False if not locked or unauthorized
"""
if not branch.lock_status:
return False
locked_by = branch.lock_status.get('user')
if not locked_by:
return False
# Check authorization
if not force and locked_by != user.id:
if not user.has_perm('history_tracking.force_unlock_branch'):
return False
# Record in history
branch.lock_history.append({
'user': user.id,
'action': 'unlock',
'timestamp': timezone.now().isoformat(),
'forced': force
})
# Clear lock
branch.lock_status = {}
branch.save()
return True
def check_lock(self, branch: VersionBranch) -> Dict[str, Any]:
"""
Check the lock status of a branch
Args:
branch: The branch to check
Returns:
dict: Lock status information
"""
if not branch.lock_status:
return {'locked': False}
expires = timezone.datetime.fromisoformat(branch.lock_status['expires'])
if timezone.now() >= expires:
# Lock has expired
branch.lock_status = {}
branch.save()
return {'locked': False}
return {
'locked': True,
'user': User.objects.get(id=branch.lock_status['user']),
'expires': expires,
'reason': branch.lock_status.get('reason', '')
}
def get_lock_history(
self,
branch: VersionBranch,
limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Get the lock history for a branch
Args:
branch: The branch to get history for
limit: Optional limit on number of entries
Returns:
list: Lock history entries
"""
history = branch.lock_history
if limit:
history = history[-limit:]
# Enhance history with user objects
for entry in history:
try:
entry['user_obj'] = User.objects.get(id=entry['user'])
except User.DoesNotExist:
entry['user_obj'] = None
return history
class ChangeTracker:
"""Tracks and manages changes across the system"""
@@ -114,6 +252,189 @@ class ChangeTracker:
"""Get all changes in a branch ordered by creation time"""
return list(ChangeSet.objects.filter(branch=branch).order_by('created_at'))
def compute_enhanced_diff(
self,
version1: Any,
version2: Any,
syntax_detect: bool = True
) -> Dict[str, Any]:
"""
Return structured diff with syntax metadata
Args:
version1: First version to compare
version2: Second version to compare
syntax_detect: Whether to detect syntax types
Returns:
Dict containing structured diff with metadata
"""
if not hasattr(version1, 'history') or not hasattr(version2, 'history'):
raise ValueError("Both versions must be history-tracked models")
# Get historical records
v1_history = version1.history.first()
v2_history = version2.history.first()
if not (v1_history and v2_history):
raise ValueError("No historical records found")
changes = {}
# Compare fields and detect syntax
for field in v2_history._meta.fields:
field_name = field.name
if field_name in [
'history_id', 'history_date', 'history_type',
'history_user_id', 'history_change_reason'
]:
continue
old_value = getattr(v1_history, field_name)
new_value = getattr(v2_history, field_name)
if old_value != new_value:
field_type = field.get_internal_type()
syntax_type = self._detect_syntax(field_type, old_value) if syntax_detect else 'text'
changes[field_name] = {
'old': str(old_value),
'new': str(new_value),
'type': field_type,
'syntax': syntax_type,
'line_numbers': self._compute_line_numbers(old_value, new_value),
'metadata': {
'comment_anchor_id': f"{v2_history.history_id}_{field_name}",
'field_type': field_type,
'content_type': v2_history._meta.model_name
}
}
# Calculate impact metrics
impact_metrics = self._calculate_impact_metrics(changes)
return {
'changes': changes,
'metadata': {
'version1_id': v1_history.history_id,
'version2_id': v2_history.history_id,
'timestamp': timezone.now().isoformat(),
'impact_score': impact_metrics['impact_score'],
'stats': impact_metrics['stats'],
'performance': {
'syntax_detection': syntax_detect,
'computed_at': timezone.now().isoformat()
}
}
}
def _detect_syntax(self, field_type: str, value: Any) -> str:
"""
Detect syntax type for field content
Args:
field_type: Django field type
value: Field value
Returns:
Detected syntax type
"""
if field_type in ['TextField', 'CharField']:
# Try to detect if it's code
if isinstance(value, str):
if value.startswith('def ') or value.startswith('class '):
return 'python'
if value.startswith('{') or value.startswith('['):
try:
json.loads(value)
return 'json'
except:
pass
if value.startswith('<!DOCTYPE') or value.startswith('<html'):
return 'html'
if value.startswith('// ') or value.startswith('function '):
return 'javascript'
syntax_map = {
'JSONField': 'json',
'FileField': 'path',
'FilePathField': 'path',
'URLField': 'url',
'EmailField': 'email',
'TextField': 'text',
'CharField': 'text'
}
return syntax_map.get(field_type, 'text')
def _compute_line_numbers(self, old_value: Any, new_value: Any) -> Dict[str, List[int]]:
"""
Compute line numbers for diff navigation
Args:
old_value: Previous value
new_value: New value
Returns:
Dict with old and new line numbers
"""
def count_lines(value):
if not isinstance(value, str):
value = str(value)
return value.count('\n') + 1
old_lines = count_lines(old_value)
new_lines = count_lines(new_value)
return {
'old': list(range(1, old_lines + 1)),
'new': list(range(1, new_lines + 1))
}
def _calculate_impact_metrics(self, changes: Dict[str, Any]) -> Dict[str, Any]:
"""
Calculate impact metrics for changes
Args:
changes: Dict of changes
Returns:
Dict with impact metrics
"""
total_lines_changed = sum(
len(c['line_numbers']['old']) + len(c['line_numbers']['new'])
for c in changes.values()
)
field_types = Counter(c['type'] for c in changes.values())
syntax_types = Counter(c['syntax'] for c in changes.values())
# Calculate impact score (0-1)
impact_weights = {
'lines_changed': 0.4,
'fields_changed': 0.3,
'complexity': 0.3
}
# Normalize metrics
normalized_lines = min(1.0, total_lines_changed / 1000) # Cap at 1000 lines
normalized_fields = min(1.0, len(changes) / 20) # Cap at 20 fields
# Complexity based on field and syntax types
complexity_score = (
len(field_types) / 10 + # Variety of field types
len(syntax_types) / 5 + # Variety of syntax types
(field_types.get('JSONField', 0) * 0.2) + # Weight complex fields higher
(syntax_types.get('python', 0) * 0.2) # Weight code changes higher
) / 2 # Normalize to 0-1
impact_score = (
impact_weights['lines_changed'] * normalized_lines +
impact_weights['fields_changed'] * normalized_fields +
impact_weights['complexity'] * complexity_score
)
return {
'impact_score': impact_score,
'stats': {
'total_lines_changed': total_lines_changed,
'fields_changed': len(changes),
'field_types': dict(field_types),
'syntax_types': dict(syntax_types)
}
}
class MergeStrategy:
"""Handles merge operations and conflict resolution"""