from django.views.generic import TemplateView from django.contrib.admin.views.decorators import staff_member_required from django.utils.decorators import method_decorator from django.utils import timezone from datetime import timedelta from .models import VersionBranch, ChangeSet from .monitoring import VersionControlMetrics @method_decorator(staff_member_required, name='dispatch') class MonitoringDashboardView(TemplateView): template_name = 'history_tracking/monitoring_dashboard.html' def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) metrics = self._collect_metrics() context['metrics'] = metrics return context def _collect_metrics(self): """Collect all monitoring metrics""" # Collect basic statistics total_branches = VersionBranch.objects.count() active_branches = VersionBranch.objects.filter(is_active=True).count() total_changes = ChangeSet.objects.count() pending_changes = ChangeSet.objects.filter(status='pending').count() # Calculate merge success rate last_week = timezone.now() - timedelta(days=7) total_merges = ChangeSet.objects.filter( created_at__gte=last_week, status__in=['applied', 'conflict'] ).count() successful_merges = ChangeSet.objects.filter( created_at__gte=last_week, status='applied' ).count() merge_success_rate = round( (successful_merges / total_merges * 100) if total_merges > 0 else 100 ) # Get performance metrics VersionControlMetrics.collect_performance_metrics() perf_metrics = self._get_performance_metrics() # Get error tracking data errors = self._get_error_tracking() # Get user activity user_activity = self._get_user_activity() return { # System Overview 'total_branches': total_branches, 'active_branches': active_branches, 'total_changes': total_changes, 'pending_changes': pending_changes, 'merge_success_rate': merge_success_rate, 'conflicted_merges': ChangeSet.objects.filter( status='conflict' ).count(), 'system_health': self._calculate_system_health(), 'health_checks': 5, # Number of health checks performed # Performance Metrics 'timing': perf_metrics['timing'], 'database': perf_metrics['database'], 'cache': perf_metrics['cache'], # Error Tracking 'errors': errors, # User Activity 'current_operations': user_activity['current'], 'recent_activity': user_activity['recent'] } def _get_performance_metrics(self): """Get detailed performance metrics""" from django.db import connection from django.core.cache import cache # Calculate average operation timings operation_times = { 'branch_creation': [], 'branch_switch': [], 'merge': [] } for log in self._get_operation_logs(): if log['operation'] in operation_times: operation_times[log['operation']].append(log['duration']) timing = { op: round(sum(times) / len(times), 2) if times else 0 for op, times in operation_times.items() } return { 'timing': timing, 'database': { 'query_count': len(connection.queries), 'query_time': round( sum(float(q['time']) for q in connection.queries), 3 ), 'pool_size': connection.pool_size if hasattr(connection, 'pool_size') else 'N/A', 'max_pool': connection.max_pool if hasattr(connection, 'max_pool') else 'N/A' }, 'cache': { 'hit_rate': round( cache.get('version_control_cache_hits', 0) / (cache.get('version_control_cache_hits', 0) + cache.get('version_control_cache_misses', 1)) * 100, 1 ), 'miss_rate': round( cache.get('version_control_cache_misses', 0) / (cache.get('version_control_cache_hits', 0) + cache.get('version_control_cache_misses', 1)) * 100, 1 ), 'memory_usage': round( cache.get('version_control_memory_usage', 0) / 1024 / 1024, 2 ) } } def _get_error_tracking(self): """Get recent error tracking data""" from django.conf import settings import logging logger = logging.getLogger('version_control') errors = [] # Get last 10 error logs if hasattr(logger, 'handlers'): for handler in logger.handlers: if isinstance(handler, logging.FileHandler): try: with open(handler.baseFilename, 'r') as f: for line in f.readlines()[-10:]: if '[ERROR]' in line: errors.append(self._parse_error_log(line)) except FileNotFoundError: pass return errors def _parse_error_log(self, log_line): """Parse error log line into structured data""" import re from datetime import datetime pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) \[ERROR\] (.*)' match = re.match(pattern, log_line) if match: timestamp_str, message = match.groups() return { 'timestamp': datetime.strptime( timestamp_str, '%Y-%m-%d %H:%M:%S,%f' ), 'type': 'Error', 'operation': self._extract_operation(message), 'message': message, 'resolved': False } return None def _extract_operation(self, message): """Extract operation type from error message""" if 'branch' in message.lower(): return 'Branch Operation' elif 'merge' in message.lower(): return 'Merge Operation' elif 'changeset' in message.lower(): return 'Change Operation' return 'Unknown Operation' def _get_user_activity(self): """Get current and recent user activity""" from django.contrib.auth import get_user_model User = get_user_model() # Get active sessions from django.contrib.sessions.models import Session current_sessions = Session.objects.filter( expire_date__gte=timezone.now() ) current_operations = [] for session in current_sessions: try: uid = session.get_decoded().get('_auth_user_id') if uid: user = User.objects.get(pk=uid) current_operations.append({ 'user': user.username, 'action': self._get_user_current_action(user) }) except (User.DoesNotExist, KeyError): continue # Get recent activity recent = ChangeSet.objects.select_related('user').order_by( '-created_at' )[:10] recent_activity = [ { 'user': change.user.username if change.user else 'System', 'action': self._get_change_action(change), 'timestamp': change.created_at } for change in recent ] return { 'current': current_operations, 'recent': recent_activity } def _get_user_current_action(self, user): """Get user's current action based on recent activity""" last_change = ChangeSet.objects.filter( user=user ).order_by('-created_at').first() if last_change: if (timezone.now() - last_change.created_at).seconds < 300: # 5 minutes return self._get_change_action(last_change) return 'Viewing' def _get_change_action(self, change): """Get human-readable action from change""" if change.status == 'applied': return f'Applied changes to {change.content_object}' elif change.status == 'pending': return f'Started editing {change.content_object}' elif change.status == 'conflict': return f'Resolving conflicts on {change.content_object}' return 'Unknown action' def _calculate_system_health(self): """Calculate overall system health percentage""" factors = { 'merge_success': self._get_merge_success_health(), 'performance': self._get_performance_health(), 'error_rate': self._get_error_rate_health() } return round(sum(factors.values()) / len(factors)) def _get_merge_success_health(self): """Calculate health based on merge success rate""" last_week = timezone.now() - timedelta(days=7) total_merges = ChangeSet.objects.filter( created_at__gte=last_week, status__in=['applied', 'conflict'] ).count() successful_merges = ChangeSet.objects.filter( created_at__gte=last_week, status='applied' ).count() if total_merges == 0: return 100 return round((successful_merges / total_merges) * 100) def _get_performance_health(self): """Calculate health based on performance metrics""" metrics = self._get_performance_metrics() factors = [ 100 if metrics['timing']['merge'] < 1000 else 50, # Under 1 second is healthy 100 if metrics['cache']['hit_rate'] > 80 else 50, # Over 80% cache hit rate is healthy 100 if metrics['database']['query_time'] < 0.5 else 50 # Under 0.5s query time is healthy ] return round(sum(factors) / len(factors)) def _get_error_rate_health(self): """Calculate health based on error rate""" last_day = timezone.now() - timedelta(days=1) total_operations = ChangeSet.objects.filter( created_at__gte=last_day ).count() error_count = len([ e for e in self._get_error_tracking() if e['timestamp'] >= last_day ]) if total_operations == 0: return 100 error_rate = (error_count / total_operations) * 100 return round(100 - error_rate) def _get_operation_logs(self): """Get operation timing logs""" import json from pathlib import Path log_file = Path('logs/version_control_timing.log') if not log_file.exists(): return [] logs = [] try: with open(log_file, 'r') as f: for line in f: try: logs.append(json.loads(line)) except json.JSONDecodeError: continue except Exception: return [] return logs