mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-20 20:51:09 -05:00
Add OWASP compliance mapping and security test case templates, and document version control implementation phases
This commit is contained in:
320
history_tracking/views_monitoring.py
Normal file
320
history_tracking/views_monitoring.py
Normal file
@@ -0,0 +1,320 @@
|
||||
from django.views.generic import TemplateView
|
||||
from django.contrib.admin.views.decorators import staff_member_required
|
||||
from django.utils.decorators import method_decorator
|
||||
from django.utils import timezone
|
||||
from datetime import timedelta
|
||||
|
||||
from .models import VersionBranch, ChangeSet
|
||||
from .monitoring import VersionControlMetrics
|
||||
|
||||
@method_decorator(staff_member_required, name='dispatch')
|
||||
class MonitoringDashboardView(TemplateView):
|
||||
template_name = 'history_tracking/monitoring_dashboard.html'
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
context = super().get_context_data(**kwargs)
|
||||
metrics = self._collect_metrics()
|
||||
context['metrics'] = metrics
|
||||
return context
|
||||
|
||||
def _collect_metrics(self):
|
||||
"""Collect all monitoring metrics"""
|
||||
# Collect basic statistics
|
||||
total_branches = VersionBranch.objects.count()
|
||||
active_branches = VersionBranch.objects.filter(is_active=True).count()
|
||||
total_changes = ChangeSet.objects.count()
|
||||
pending_changes = ChangeSet.objects.filter(status='pending').count()
|
||||
|
||||
# Calculate merge success rate
|
||||
last_week = timezone.now() - timedelta(days=7)
|
||||
total_merges = ChangeSet.objects.filter(
|
||||
created_at__gte=last_week,
|
||||
status__in=['applied', 'conflict']
|
||||
).count()
|
||||
successful_merges = ChangeSet.objects.filter(
|
||||
created_at__gte=last_week,
|
||||
status='applied'
|
||||
).count()
|
||||
merge_success_rate = round(
|
||||
(successful_merges / total_merges * 100) if total_merges > 0 else 100
|
||||
)
|
||||
|
||||
# Get performance metrics
|
||||
VersionControlMetrics.collect_performance_metrics()
|
||||
perf_metrics = self._get_performance_metrics()
|
||||
|
||||
# Get error tracking data
|
||||
errors = self._get_error_tracking()
|
||||
|
||||
# Get user activity
|
||||
user_activity = self._get_user_activity()
|
||||
|
||||
return {
|
||||
# System Overview
|
||||
'total_branches': total_branches,
|
||||
'active_branches': active_branches,
|
||||
'total_changes': total_changes,
|
||||
'pending_changes': pending_changes,
|
||||
'merge_success_rate': merge_success_rate,
|
||||
'conflicted_merges': ChangeSet.objects.filter(
|
||||
status='conflict'
|
||||
).count(),
|
||||
'system_health': self._calculate_system_health(),
|
||||
'health_checks': 5, # Number of health checks performed
|
||||
|
||||
# Performance Metrics
|
||||
'timing': perf_metrics['timing'],
|
||||
'database': perf_metrics['database'],
|
||||
'cache': perf_metrics['cache'],
|
||||
|
||||
# Error Tracking
|
||||
'errors': errors,
|
||||
|
||||
# User Activity
|
||||
'current_operations': user_activity['current'],
|
||||
'recent_activity': user_activity['recent']
|
||||
}
|
||||
|
||||
def _get_performance_metrics(self):
|
||||
"""Get detailed performance metrics"""
|
||||
from django.db import connection
|
||||
from django.core.cache import cache
|
||||
|
||||
# Calculate average operation timings
|
||||
operation_times = {
|
||||
'branch_creation': [],
|
||||
'branch_switch': [],
|
||||
'merge': []
|
||||
}
|
||||
|
||||
for log in self._get_operation_logs():
|
||||
if log['operation'] in operation_times:
|
||||
operation_times[log['operation']].append(log['duration'])
|
||||
|
||||
timing = {
|
||||
op: round(sum(times) / len(times), 2) if times else 0
|
||||
for op, times in operation_times.items()
|
||||
}
|
||||
|
||||
return {
|
||||
'timing': timing,
|
||||
'database': {
|
||||
'query_count': len(connection.queries),
|
||||
'query_time': round(
|
||||
sum(float(q['time']) for q in connection.queries),
|
||||
3
|
||||
),
|
||||
'pool_size': connection.pool_size if hasattr(connection, 'pool_size') else 'N/A',
|
||||
'max_pool': connection.max_pool if hasattr(connection, 'max_pool') else 'N/A'
|
||||
},
|
||||
'cache': {
|
||||
'hit_rate': round(
|
||||
cache.get('version_control_cache_hits', 0) /
|
||||
(cache.get('version_control_cache_hits', 0) +
|
||||
cache.get('version_control_cache_misses', 1)) * 100,
|
||||
1
|
||||
),
|
||||
'miss_rate': round(
|
||||
cache.get('version_control_cache_misses', 0) /
|
||||
(cache.get('version_control_cache_hits', 0) +
|
||||
cache.get('version_control_cache_misses', 1)) * 100,
|
||||
1
|
||||
),
|
||||
'memory_usage': round(
|
||||
cache.get('version_control_memory_usage', 0) / 1024 / 1024,
|
||||
2
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
def _get_error_tracking(self):
|
||||
"""Get recent error tracking data"""
|
||||
from django.conf import settings
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger('version_control')
|
||||
errors = []
|
||||
|
||||
# Get last 10 error logs
|
||||
if hasattr(logger, 'handlers'):
|
||||
for handler in logger.handlers:
|
||||
if isinstance(handler, logging.FileHandler):
|
||||
try:
|
||||
with open(handler.baseFilename, 'r') as f:
|
||||
for line in f.readlines()[-10:]:
|
||||
if '[ERROR]' in line:
|
||||
errors.append(self._parse_error_log(line))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
return errors
|
||||
|
||||
def _parse_error_log(self, log_line):
|
||||
"""Parse error log line into structured data"""
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) \[ERROR\] (.*)'
|
||||
match = re.match(pattern, log_line)
|
||||
|
||||
if match:
|
||||
timestamp_str, message = match.groups()
|
||||
return {
|
||||
'timestamp': datetime.strptime(
|
||||
timestamp_str,
|
||||
'%Y-%m-%d %H:%M:%S,%f'
|
||||
),
|
||||
'type': 'Error',
|
||||
'operation': self._extract_operation(message),
|
||||
'message': message,
|
||||
'resolved': False
|
||||
}
|
||||
return None
|
||||
|
||||
def _extract_operation(self, message):
|
||||
"""Extract operation type from error message"""
|
||||
if 'branch' in message.lower():
|
||||
return 'Branch Operation'
|
||||
elif 'merge' in message.lower():
|
||||
return 'Merge Operation'
|
||||
elif 'changeset' in message.lower():
|
||||
return 'Change Operation'
|
||||
return 'Unknown Operation'
|
||||
|
||||
def _get_user_activity(self):
|
||||
"""Get current and recent user activity"""
|
||||
from django.contrib.auth import get_user_model
|
||||
User = get_user_model()
|
||||
|
||||
# Get active sessions
|
||||
from django.contrib.sessions.models import Session
|
||||
current_sessions = Session.objects.filter(
|
||||
expire_date__gte=timezone.now()
|
||||
)
|
||||
|
||||
current_operations = []
|
||||
for session in current_sessions:
|
||||
try:
|
||||
uid = session.get_decoded().get('_auth_user_id')
|
||||
if uid:
|
||||
user = User.objects.get(pk=uid)
|
||||
current_operations.append({
|
||||
'user': user.username,
|
||||
'action': self._get_user_current_action(user)
|
||||
})
|
||||
except (User.DoesNotExist, KeyError):
|
||||
continue
|
||||
|
||||
# Get recent activity
|
||||
recent = ChangeSet.objects.select_related('user').order_by(
|
||||
'-created_at'
|
||||
)[:10]
|
||||
recent_activity = [
|
||||
{
|
||||
'user': change.user.username if change.user else 'System',
|
||||
'action': self._get_change_action(change),
|
||||
'timestamp': change.created_at
|
||||
}
|
||||
for change in recent
|
||||
]
|
||||
|
||||
return {
|
||||
'current': current_operations,
|
||||
'recent': recent_activity
|
||||
}
|
||||
|
||||
def _get_user_current_action(self, user):
|
||||
"""Get user's current action based on recent activity"""
|
||||
last_change = ChangeSet.objects.filter(
|
||||
user=user
|
||||
).order_by('-created_at').first()
|
||||
|
||||
if last_change:
|
||||
if (timezone.now() - last_change.created_at).seconds < 300: # 5 minutes
|
||||
return self._get_change_action(last_change)
|
||||
return 'Viewing'
|
||||
|
||||
def _get_change_action(self, change):
|
||||
"""Get human-readable action from change"""
|
||||
if change.status == 'applied':
|
||||
return f'Applied changes to {change.content_object}'
|
||||
elif change.status == 'pending':
|
||||
return f'Started editing {change.content_object}'
|
||||
elif change.status == 'conflict':
|
||||
return f'Resolving conflicts on {change.content_object}'
|
||||
return 'Unknown action'
|
||||
|
||||
def _calculate_system_health(self):
|
||||
"""Calculate overall system health percentage"""
|
||||
factors = {
|
||||
'merge_success': self._get_merge_success_health(),
|
||||
'performance': self._get_performance_health(),
|
||||
'error_rate': self._get_error_rate_health()
|
||||
}
|
||||
return round(sum(factors.values()) / len(factors))
|
||||
|
||||
def _get_merge_success_health(self):
|
||||
"""Calculate health based on merge success rate"""
|
||||
last_week = timezone.now() - timedelta(days=7)
|
||||
total_merges = ChangeSet.objects.filter(
|
||||
created_at__gte=last_week,
|
||||
status__in=['applied', 'conflict']
|
||||
).count()
|
||||
successful_merges = ChangeSet.objects.filter(
|
||||
created_at__gte=last_week,
|
||||
status='applied'
|
||||
).count()
|
||||
|
||||
if total_merges == 0:
|
||||
return 100
|
||||
return round((successful_merges / total_merges) * 100)
|
||||
|
||||
def _get_performance_health(self):
|
||||
"""Calculate health based on performance metrics"""
|
||||
metrics = self._get_performance_metrics()
|
||||
|
||||
factors = [
|
||||
100 if metrics['timing']['merge'] < 1000 else 50, # Under 1 second is healthy
|
||||
100 if metrics['cache']['hit_rate'] > 80 else 50, # Over 80% cache hit rate is healthy
|
||||
100 if metrics['database']['query_time'] < 0.5 else 50 # Under 0.5s query time is healthy
|
||||
]
|
||||
|
||||
return round(sum(factors) / len(factors))
|
||||
|
||||
def _get_error_rate_health(self):
|
||||
"""Calculate health based on error rate"""
|
||||
last_day = timezone.now() - timedelta(days=1)
|
||||
total_operations = ChangeSet.objects.filter(
|
||||
created_at__gte=last_day
|
||||
).count()
|
||||
error_count = len([
|
||||
e for e in self._get_error_tracking()
|
||||
if e['timestamp'] >= last_day
|
||||
])
|
||||
|
||||
if total_operations == 0:
|
||||
return 100
|
||||
error_rate = (error_count / total_operations) * 100
|
||||
return round(100 - error_rate)
|
||||
|
||||
def _get_operation_logs(self):
|
||||
"""Get operation timing logs"""
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
log_file = Path('logs/version_control_timing.log')
|
||||
if not log_file.exists():
|
||||
return []
|
||||
|
||||
logs = []
|
||||
try:
|
||||
with open(log_file, 'r') as f:
|
||||
for line in f:
|
||||
try:
|
||||
logs.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
return logs
|
||||
Reference in New Issue
Block a user