Add comprehensive tests for Parks API and models

- Implemented extensive test cases for the Parks API, covering endpoints for listing, retrieving, creating, updating, and deleting parks.
- Added tests for filtering, searching, and ordering parks in the API.
- Created tests for error handling in the API, including malformed JSON and unsupported methods.
- Developed model tests for Park, ParkArea, Company, and ParkReview models, ensuring validation and constraints are enforced.
- Introduced utility mixins for API and model testing to streamline assertions and enhance test readability.
- Included integration tests to validate complete workflows involving park creation, retrieval, updating, and deletion.
This commit is contained in:
pacnpal
2025-08-17 19:36:20 -04:00
parent 17228e9935
commit c26414ff74
210 changed files with 24155 additions and 833 deletions

View File

@@ -1,2 +1 @@
from .search import *
from .views import *
# Core views

256
core/views/health_views.py Normal file
View File

@@ -0,0 +1,256 @@
"""
Enhanced health check views for API monitoring.
"""
import time
from django.http import JsonResponse
from django.utils import timezone
from django.views import View
from django.conf import settings
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import AllowAny
from health_check.views import MainView
from core.services.enhanced_cache_service import CacheMonitor
from core.utils.query_optimization import IndexAnalyzer
class HealthCheckAPIView(APIView):
"""
Enhanced API endpoint for health checks with detailed JSON response
"""
permission_classes = [AllowAny] # Public endpoint
def get(self, request):
"""Return comprehensive health check information"""
start_time = time.time()
# Get basic health check results
main_view = MainView()
main_view.request = request
plugins = main_view.plugins
errors = main_view.errors
# Collect additional performance metrics
cache_monitor = CacheMonitor()
cache_stats = cache_monitor.get_cache_stats()
# Build comprehensive health data
health_data = {
'status': 'healthy' if not errors else 'unhealthy',
'timestamp': timezone.now().isoformat(),
'version': getattr(settings, 'VERSION', '1.0.0'),
'environment': getattr(settings, 'ENVIRONMENT', 'development'),
'response_time_ms': 0, # Will be calculated at the end
'checks': {},
'metrics': {
'cache': cache_stats,
'database': self._get_database_metrics(),
'system': self._get_system_metrics(),
}
}
# Process individual health checks
for plugin in plugins:
plugin_name = plugin.identifier()
plugin_errors = errors.get(plugin.__class__.__name__, [])
health_data['checks'][plugin_name] = {
'status': 'healthy' if not plugin_errors else 'unhealthy',
'critical': getattr(plugin, 'critical_service', False),
'errors': [str(error) for error in plugin_errors],
'response_time_ms': getattr(plugin, '_response_time', None)
}
# Calculate total response time
health_data['response_time_ms'] = round((time.time() - start_time) * 1000, 2)
# Determine HTTP status code
status_code = 200
if errors:
# Check if any critical services are failing
critical_errors = any(
getattr(plugin, 'critical_service', False)
for plugin in plugins
if errors.get(plugin.__class__.__name__)
)
status_code = 503 if critical_errors else 200
return Response(health_data, status=status_code)
def _get_database_metrics(self):
"""Get database performance metrics"""
try:
from django.db import connection
# Get basic connection info
metrics = {
'vendor': connection.vendor,
'connection_status': 'connected',
}
# Test query performance
start_time = time.time()
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
query_time = (time.time() - start_time) * 1000
metrics['test_query_time_ms'] = round(query_time, 2)
# PostgreSQL specific metrics
if connection.vendor == 'postgresql':
try:
with connection.cursor() as cursor:
cursor.execute("""
SELECT
numbackends as active_connections,
xact_commit as transactions_committed,
xact_rollback as transactions_rolled_back,
blks_read as blocks_read,
blks_hit as blocks_hit
FROM pg_stat_database
WHERE datname = current_database()
""")
row = cursor.fetchone()
if row:
metrics.update({
'active_connections': row[0],
'transactions_committed': row[1],
'transactions_rolled_back': row[2],
'cache_hit_ratio': round((row[4] / (row[3] + row[4])) * 100, 2) if (row[3] + row[4]) > 0 else 0
})
except Exception:
pass # Skip advanced metrics if not available
return metrics
except Exception as e:
return {
'connection_status': 'error',
'error': str(e)
}
def _get_system_metrics(self):
"""Get system performance metrics"""
metrics = {
'debug_mode': settings.DEBUG,
'allowed_hosts': settings.ALLOWED_HOSTS if settings.DEBUG else ['hidden'],
}
try:
import psutil
# Memory metrics
memory = psutil.virtual_memory()
metrics['memory'] = {
'total_mb': round(memory.total / 1024 / 1024, 2),
'available_mb': round(memory.available / 1024 / 1024, 2),
'percent_used': memory.percent,
}
# CPU metrics
metrics['cpu'] = {
'percent_used': psutil.cpu_percent(interval=0.1),
'core_count': psutil.cpu_count(),
}
# Disk metrics
disk = psutil.disk_usage('/')
metrics['disk'] = {
'total_gb': round(disk.total / 1024 / 1024 / 1024, 2),
'free_gb': round(disk.free / 1024 / 1024 / 1024, 2),
'percent_used': round((disk.used / disk.total) * 100, 2),
}
except ImportError:
metrics['system_monitoring'] = 'psutil not available'
except Exception as e:
metrics['system_error'] = str(e)
return metrics
class PerformanceMetricsView(APIView):
"""
API view for performance metrics and database analysis
"""
permission_classes = [AllowAny] if settings.DEBUG else []
def get(self, request):
"""Return performance metrics and analysis"""
if not settings.DEBUG:
return Response({'error': 'Only available in debug mode'}, status=403)
metrics = {
'timestamp': timezone.now().isoformat(),
'database_analysis': self._get_database_analysis(),
'cache_performance': self._get_cache_performance(),
'recent_slow_queries': self._get_slow_queries(),
}
return Response(metrics)
def _get_database_analysis(self):
"""Analyze database performance"""
try:
from django.db import connection
analysis = {
'total_queries': len(connection.queries),
'query_analysis': IndexAnalyzer.analyze_slow_queries(0.05),
}
if connection.queries:
query_times = [float(q.get('time', 0)) for q in connection.queries]
analysis.update({
'total_query_time': sum(query_times),
'average_query_time': sum(query_times) / len(query_times),
'slowest_query_time': max(query_times),
'fastest_query_time': min(query_times),
})
return analysis
except Exception as e:
return {'error': str(e)}
def _get_cache_performance(self):
"""Get cache performance metrics"""
try:
cache_monitor = CacheMonitor()
return cache_monitor.get_cache_stats()
except Exception as e:
return {'error': str(e)}
def _get_slow_queries(self):
"""Get recent slow queries"""
try:
return IndexAnalyzer.analyze_slow_queries(0.1) # 100ms threshold
except Exception as e:
return {'error': str(e)}
class SimpleHealthView(View):
"""
Simple health check endpoint for load balancers
"""
def get(self, request):
"""Return simple OK status"""
try:
# Basic database connectivity test
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
return JsonResponse({'status': 'ok', 'timestamp': timezone.now().isoformat()})
except Exception as e:
return JsonResponse(
{'status': 'error', 'error': str(e), 'timestamp': timezone.now().isoformat()},
status=503
)