Improve park listing performance with optimized queries and caching

Implement performance enhancements for park listing by optimizing database queries, introducing efficient caching mechanisms, and refining pagination for a significantly faster and smoother user experience.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: c446bc9e-66df-438c-a86c-f53e6da13649
Replit-Commit-Checkpoint-Type: intermediate_checkpoint
This commit is contained in:
pac7
2025-09-23 22:50:09 +00:00
parent 6391b3d81c
commit fff180c476
11 changed files with 2864 additions and 70 deletions

File diff suppressed because one or more lines are too long

View File

@@ -28,7 +28,8 @@ class ParkFilterService:
self, base_queryset: Optional[QuerySet] = None
) -> Dict[str, Any]:
"""
Get counts for various filter options to show users what's available.
Get counts for various filter options with optimized single-query aggregations.
This eliminates multiple expensive COUNT queries.
Args:
base_queryset: Optional base queryset to use for calculations
@@ -42,24 +43,49 @@ class ParkFilterService:
if cached_result is not None:
return cached_result
if base_queryset is None:
base_queryset = get_base_park_queryset()
from apps.core.utils.query_optimization import track_queries
with track_queries("optimized_filter_counts"):
if base_queryset is None:
base_queryset = get_base_park_queryset()
# Calculate filter counts
filter_counts = {
"total_parks": base_queryset.count(),
"operating_parks": base_queryset.filter(status="OPERATING").count(),
"parks_with_coasters": base_queryset.filter(coaster_count__gt=0).count(),
"big_parks": base_queryset.filter(ride_count__gte=10).count(),
"highly_rated": base_queryset.filter(average_rating__gte=4.0).count(),
"park_types": self._get_park_type_counts(base_queryset),
"top_operators": self._get_top_operators(base_queryset),
"countries": self._get_country_counts(base_queryset),
}
# Use optimized single-query aggregation instead of multiple COUNT queries
aggregates = base_queryset.aggregate(
total_parks=Count('id'),
operating_parks=Count('id', filter=Q(status='OPERATING')),
parks_with_coasters=Count('id', filter=Q(coaster_count__gt=0)),
big_parks=Count('id', filter=Q(ride_count__gte=10)),
highly_rated=Count('id', filter=Q(average_rating__gte=4.0)),
disney_parks=Count('id', filter=Q(operator__name__icontains='Disney')),
universal_parks=Count('id', filter=Q(operator__name__icontains='Universal')),
six_flags_parks=Count('id', filter=Q(operator__name__icontains='Six Flags')),
cedar_fair_parks=Count('id', filter=Q(
Q(operator__name__icontains='Cedar Fair') |
Q(operator__name__icontains='Cedar Point') |
Q(operator__name__icontains='Kings Island')
))
)
# Cache the result
cache.set(cache_key, filter_counts, self.CACHE_TIMEOUT)
return filter_counts
# Calculate filter counts efficiently
filter_counts = {
"total_parks": aggregates['total_parks'],
"operating_parks": aggregates['operating_parks'],
"parks_with_coasters": aggregates['parks_with_coasters'],
"big_parks": aggregates['big_parks'],
"highly_rated": aggregates['highly_rated'],
"park_types": {
"disney": aggregates['disney_parks'],
"universal": aggregates['universal_parks'],
"six_flags": aggregates['six_flags_parks'],
"cedar_fair": aggregates['cedar_fair_parks'],
},
"top_operators": self._get_top_operators_optimized(base_queryset),
"countries": self._get_country_counts_optimized(base_queryset),
}
# Cache the result for longer since this is expensive
cache.set(cache_key, filter_counts, self.CACHE_TIMEOUT * 2)
return filter_counts
def _get_park_type_counts(self, queryset: QuerySet) -> Dict[str, int]:
"""Get counts for different park types based on operator names."""
@@ -210,9 +236,11 @@ class ParkFilterService:
for key in cache_keys:
cache.delete(key)
def get_filtered_queryset(self, filters: Dict[str, Any]) -> QuerySet: # noqa: C901
def get_optimized_filtered_queryset(self, filters: Dict[str, Any]) -> QuerySet: # noqa: C901
"""
Apply filters to get a filtered queryset with optimizations.
Apply filters to get a filtered queryset with comprehensive optimizations.
This method eliminates the expensive subquery pattern and builds an optimized
queryset from the ground up.
Args:
filters: Dictionary of filter parameters
@@ -220,6 +248,94 @@ class ParkFilterService:
Returns:
Filtered and optimized QuerySet
"""
from apps.core.utils.query_optimization import track_queries
with track_queries("optimized_filtered_queryset"):
# Start with base Park queryset and apply all optimizations at once
queryset = (
Park.objects
.select_related(
"operator",
"property_owner",
"location",
"banner_image",
"card_image"
)
.prefetch_related(
"photos",
"rides__manufacturer",
"areas"
)
.annotate(
current_ride_count=Count("rides", distinct=True),
current_coaster_count=Count(
"rides", filter=Q(rides__category="RC"), distinct=True
),
)
)
# Build optimized filter conditions
filter_conditions = Q()
# Apply status filter
if filters.get("status"):
filter_conditions &= Q(status=filters["status"])
# Apply park type filter
if filters.get("park_type"):
filter_conditions &= self._get_park_type_filter(filters["park_type"])
# Apply coaster filter
if filters.get("has_coasters"):
filter_conditions &= Q(coaster_count__gt=0)
# Apply rating filter
if filters.get("min_rating"):
try:
min_rating = float(filters["min_rating"])
filter_conditions &= Q(average_rating__gte=min_rating)
except (ValueError, TypeError):
pass
# Apply big parks filter
if filters.get("big_parks_only"):
filter_conditions &= Q(ride_count__gte=10)
# Apply optimized search using search_text field
if filters.get("search"):
search_query = filters["search"].strip()
if search_query:
# Use the computed search_text field for better performance
search_conditions = (
Q(search_text__icontains=search_query)
| Q(name__icontains=search_query)
| Q(location__city__icontains=search_query)
| Q(location__country__icontains=search_query)
)
filter_conditions &= search_conditions
# Apply location filters
if filters.get("country_filter"):
filter_conditions &= Q(
location__country__icontains=filters["country_filter"]
)
if filters.get("state_filter"):
filter_conditions &= Q(
location__state__icontains=filters["state_filter"]
)
# Apply all filters at once for better query planning
if filter_conditions:
queryset = queryset.filter(filter_conditions)
return queryset.distinct()
def get_filtered_queryset(self, filters: Dict[str, Any]) -> QuerySet: # noqa: C901
"""
Legacy method - kept for backward compatibility.
Use get_optimized_filtered_queryset for new implementations.
"""
queryset = (
get_base_park_queryset()
.select_related("operator", "property_owner", "location")
@@ -302,3 +418,50 @@ class ParkFilterService:
return queryset.filter(type_filters[park_type])
return queryset
def _get_park_type_filter(self, park_type: str) -> Q:
"""Get park type filter as Q object for optimized filtering."""
type_filters = {
"disney": Q(operator__name__icontains="Disney"),
"universal": Q(operator__name__icontains="Universal"),
"six_flags": Q(operator__name__icontains="Six Flags"),
"cedar_fair": (
Q(operator__name__icontains="Cedar Fair")
| Q(operator__name__icontains="Cedar Point")
| Q(operator__name__icontains="Kings Island")
| Q(operator__name__icontains="Canada's Wonderland")
),
"independent": ~(
Q(operator__name__icontains="Disney")
| Q(operator__name__icontains="Universal")
| Q(operator__name__icontains="Six Flags")
| Q(operator__name__icontains="Cedar Fair")
| Q(operator__name__icontains="Cedar Point")
| Q(operator__name__icontains="Kings Island")
| Q(operator__name__icontains="Canada's Wonderland")
),
}
return type_filters.get(park_type, Q())
def _get_top_operators_optimized(
self, queryset: QuerySet, limit: int = 10
) -> List[Dict[str, Any]]:
"""Get the top operators by number of parks using optimized query."""
return list(
queryset.values("operator__name", "operator__id")
.annotate(park_count=Count("id"))
.filter(park_count__gt=0)
.order_by("-park_count")[:limit]
)
def _get_country_counts_optimized(
self, queryset: QuerySet, limit: int = 10
) -> List[Dict[str, Any]]:
"""Get countries with the most parks using optimized query."""
return list(
queryset.filter(location__country__isnull=False)
.values("location__country")
.annotate(park_count=Count("id"))
.filter(park_count__gt=0)
.order_by("-park_count")[:limit]
)

View File

@@ -0,0 +1,311 @@
"""
Optimized pagination service for large datasets with efficient counting.
"""
from typing import Dict, Any, Optional, Tuple
from django.core.paginator import Paginator, Page
from django.core.cache import cache
from django.db.models import QuerySet, Count
from django.conf import settings
import hashlib
import time
import logging
logger = logging.getLogger("pagination_service")
class OptimizedPaginator(Paginator):
"""
Custom paginator that optimizes COUNT queries and provides caching.
"""
def __init__(self, object_list, per_page, cache_timeout=300, **kwargs):
super().__init__(object_list, per_page, **kwargs)
self.cache_timeout = cache_timeout
self._cached_count = None
self._count_cache_key = None
def _get_count_cache_key(self) -> str:
"""Generate cache key for count based on queryset SQL."""
if self._count_cache_key:
return self._count_cache_key
# Create cache key from queryset SQL
if hasattr(self.object_list, 'query'):
sql_hash = hashlib.md5(
str(self.object_list.query).encode('utf-8')
).hexdigest()[:16]
self._count_cache_key = f"paginator_count:{sql_hash}"
else:
# Fallback for non-queryset object lists
self._count_cache_key = f"paginator_count:list:{len(self.object_list)}"
return self._count_cache_key
@property
def count(self):
"""
Optimized count with caching for expensive querysets.
"""
if self._cached_count is not None:
return self._cached_count
cache_key = self._get_count_cache_key()
cached_count = cache.get(cache_key)
if cached_count is not None:
logger.debug(f"Cache hit for pagination count: {cache_key}")
self._cached_count = cached_count
return cached_count
# Perform optimized count
start_time = time.time()
if hasattr(self.object_list, 'count'):
# For QuerySets, try to optimize the count query
count = self._get_optimized_count()
else:
count = len(self.object_list)
execution_time = time.time() - start_time
# Cache the result
cache.set(cache_key, count, self.cache_timeout)
self._cached_count = count
if execution_time > 0.5: # Log slow count queries
logger.warning(
f"Slow pagination count query: {execution_time:.3f}s for {count} items",
extra={'cache_key': cache_key, 'execution_time': execution_time}
)
return count
def _get_optimized_count(self) -> int:
"""
Get optimized count for complex querysets.
"""
queryset = self.object_list
# For complex queries with joins, use approximate counting for very large datasets
if self._is_complex_query(queryset):
# Try to get count from a simpler subquery
try:
# Use subquery approach for complex queries
subquery = queryset.values('pk')
return subquery.count()
except Exception as e:
logger.warning(f"Optimized count failed, falling back to standard count: {e}")
return queryset.count()
else:
return queryset.count()
def _is_complex_query(self, queryset) -> bool:
"""
Determine if a queryset is complex and might benefit from optimization.
"""
if not hasattr(queryset, 'query'):
return False
sql = str(queryset.query).upper()
# Consider complex if it has multiple joins or subqueries
complexity_indicators = [
'JOIN' in sql and sql.count('JOIN') > 2,
'DISTINCT' in sql,
'GROUP BY' in sql,
'HAVING' in sql,
]
return any(complexity_indicators)
class CursorPaginator:
"""
Cursor-based pagination for very large datasets.
More efficient than offset-based pagination for large page numbers.
"""
def __init__(self, queryset: QuerySet, ordering_field: str = 'id', per_page: int = 20):
self.queryset = queryset
self.ordering_field = ordering_field
self.per_page = per_page
self.reverse = ordering_field.startswith('-')
self.field_name = ordering_field.lstrip('-')
def get_page(self, cursor: Optional[str] = None) -> Dict[str, Any]:
"""
Get a page of results using cursor-based pagination.
Args:
cursor: Base64 encoded cursor value from previous page
Returns:
Dictionary with page data and navigation cursors
"""
queryset = self.queryset.order_by(self.ordering_field)
if cursor:
# Decode cursor and filter from that point
try:
cursor_value = self._decode_cursor(cursor)
if self.reverse:
queryset = queryset.filter(**{f"{self.field_name}__lt": cursor_value})
else:
queryset = queryset.filter(**{f"{self.field_name}__gt": cursor_value})
except (ValueError, TypeError):
# Invalid cursor, start from beginning
pass
# Get one extra item to check if there's a next page
items = list(queryset[:self.per_page + 1])
has_next = len(items) > self.per_page
if has_next:
items = items[:-1] # Remove the extra item
# Generate cursors for navigation
next_cursor = None
previous_cursor = None
if items and has_next:
last_item = items[-1]
next_cursor = self._encode_cursor(getattr(last_item, self.field_name))
if items and cursor:
first_item = items[0]
previous_cursor = self._encode_cursor(getattr(first_item, self.field_name))
return {
'items': items,
'has_next': has_next,
'has_previous': cursor is not None,
'next_cursor': next_cursor,
'previous_cursor': previous_cursor,
'count': len(items)
}
def _encode_cursor(self, value) -> str:
"""Encode cursor value to base64 string."""
import base64
return base64.b64encode(str(value).encode()).decode()
def _decode_cursor(self, cursor: str):
"""Decode cursor from base64 string."""
import base64
decoded = base64.b64decode(cursor.encode()).decode()
# Try to convert to appropriate type based on field
field = self.queryset.model._meta.get_field(self.field_name)
if hasattr(field, 'to_python'):
return field.to_python(decoded)
return decoded
class PaginationCache:
"""
Advanced caching for pagination metadata and results.
"""
CACHE_PREFIX = "pagination"
DEFAULT_TIMEOUT = 300 # 5 minutes
@classmethod
def get_page_cache_key(cls, queryset_hash: str, page_num: int) -> str:
"""Generate cache key for a specific page."""
return f"{cls.CACHE_PREFIX}:page:{queryset_hash}:{page_num}"
@classmethod
def get_metadata_cache_key(cls, queryset_hash: str) -> str:
"""Generate cache key for pagination metadata."""
return f"{cls.CACHE_PREFIX}:meta:{queryset_hash}"
@classmethod
def cache_page_results(
cls,
queryset_hash: str,
page_num: int,
page_data: Dict[str, Any],
timeout: int = DEFAULT_TIMEOUT
):
"""Cache page results."""
cache_key = cls.get_page_cache_key(queryset_hash, page_num)
cache.set(cache_key, page_data, timeout)
@classmethod
def get_cached_page(cls, queryset_hash: str, page_num: int) -> Optional[Dict[str, Any]]:
"""Get cached page results."""
cache_key = cls.get_page_cache_key(queryset_hash, page_num)
return cache.get(cache_key)
@classmethod
def cache_metadata(
cls,
queryset_hash: str,
metadata: Dict[str, Any],
timeout: int = DEFAULT_TIMEOUT
):
"""Cache pagination metadata."""
cache_key = cls.get_metadata_cache_key(queryset_hash)
cache.set(cache_key, metadata, timeout)
@classmethod
def get_cached_metadata(cls, queryset_hash: str) -> Optional[Dict[str, Any]]:
"""Get cached pagination metadata."""
cache_key = cls.get_metadata_cache_key(queryset_hash)
return cache.get(cache_key)
@classmethod
def invalidate_cache(cls, queryset_hash: str):
"""Invalidate all cache entries for a queryset."""
# This would require a cache backend that supports pattern deletion
# For now, we'll rely on TTL expiration
pass
def get_optimized_page(
queryset: QuerySet,
page_number: int,
per_page: int = 20,
use_cursor: bool = False,
cursor: Optional[str] = None,
cache_timeout: int = 300
) -> Tuple[Page, Dict[str, Any]]:
"""
Get an optimized page with caching and performance monitoring.
Args:
queryset: The queryset to paginate
page_number: Page number to retrieve
per_page: Items per page
use_cursor: Whether to use cursor-based pagination
cursor: Cursor for cursor-based pagination
cache_timeout: Cache timeout in seconds
Returns:
Tuple of (Page object, metadata dict)
"""
if use_cursor:
paginator = CursorPaginator(queryset, per_page=per_page)
page_data = paginator.get_page(cursor)
return page_data, {
'pagination_type': 'cursor',
'has_next': page_data['has_next'],
'has_previous': page_data['has_previous'],
'next_cursor': page_data['next_cursor'],
'previous_cursor': page_data['previous_cursor']
}
else:
paginator = OptimizedPaginator(queryset, per_page, cache_timeout=cache_timeout)
page = paginator.get_page(page_number)
return page, {
'pagination_type': 'offset',
'total_pages': paginator.num_pages,
'total_count': paginator.count,
'has_next': page.has_next(),
'has_previous': page.has_previous(),
'current_page': page.number
}

View File

@@ -0,0 +1,402 @@
"""
Performance monitoring and benchmarking tools for park listing optimizations.
"""
import time
import logging
import statistics
from typing import Dict, List, Any, Optional, Callable
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from django.db import connection
from django.core.cache import cache
from django.conf import settings
from django.test import RequestFactory
import json
logger = logging.getLogger("performance_monitoring")
@dataclass
class PerformanceMetric:
"""Data class for storing performance metrics."""
operation: str
duration: float
query_count: int
cache_hits: int = 0
cache_misses: int = 0
memory_usage: Optional[float] = None
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
class PerformanceMonitor:
"""
Comprehensive performance monitoring for park listing operations.
"""
def __init__(self):
self.metrics: List[PerformanceMetric] = []
self.cache_stats = {'hits': 0, 'misses': 0}
@contextmanager
def measure_operation(self, operation_name: str, **metadata):
"""Context manager to measure operation performance."""
initial_queries = len(connection.queries) if hasattr(connection, 'queries') else 0
initial_cache_hits = self.cache_stats['hits']
initial_cache_misses = self.cache_stats['misses']
start_time = time.perf_counter()
start_memory = self._get_memory_usage()
try:
yield
finally:
end_time = time.perf_counter()
end_memory = self._get_memory_usage()
duration = end_time - start_time
query_count = (len(connection.queries) - initial_queries) if hasattr(connection, 'queries') else 0
cache_hits = self.cache_stats['hits'] - initial_cache_hits
cache_misses = self.cache_stats['misses'] - initial_cache_misses
memory_delta = end_memory - start_memory if start_memory and end_memory else None
metric = PerformanceMetric(
operation=operation_name,
duration=duration,
query_count=query_count,
cache_hits=cache_hits,
cache_misses=cache_misses,
memory_usage=memory_delta,
metadata=metadata
)
self.metrics.append(metric)
self._log_metric(metric)
def _get_memory_usage(self) -> Optional[float]:
"""Get current memory usage in MB."""
try:
import psutil
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024 # Convert to MB
except ImportError:
return None
def _log_metric(self, metric: PerformanceMetric):
"""Log performance metric with appropriate level."""
message = (
f"{metric.operation}: {metric.duration:.3f}s, "
f"{metric.query_count} queries, "
f"{metric.cache_hits} cache hits"
)
if metric.memory_usage:
message += f", {metric.memory_usage:.2f}MB memory delta"
# Log as warning if performance is concerning
if metric.duration > 1.0 or metric.query_count > 10:
logger.warning(f"Performance concern: {message}")
else:
logger.info(f"Performance metric: {message}")
def get_performance_summary(self) -> Dict[str, Any]:
"""Get summary of all performance metrics."""
if not self.metrics:
return {'message': 'No metrics collected'}
durations = [m.duration for m in self.metrics]
query_counts = [m.query_count for m in self.metrics]
return {
'total_operations': len(self.metrics),
'duration_stats': {
'mean': statistics.mean(durations),
'median': statistics.median(durations),
'min': min(durations),
'max': max(durations),
'total': sum(durations)
},
'query_stats': {
'mean': statistics.mean(query_counts),
'median': statistics.median(query_counts),
'min': min(query_counts),
'max': max(query_counts),
'total': sum(query_counts)
},
'cache_stats': {
'total_hits': sum(m.cache_hits for m in self.metrics),
'total_misses': sum(m.cache_misses for m in self.metrics),
'hit_rate': self._calculate_cache_hit_rate()
},
'slowest_operations': self._get_slowest_operations(5),
'most_query_intensive': self._get_most_query_intensive(5)
}
def _calculate_cache_hit_rate(self) -> float:
"""Calculate overall cache hit rate."""
total_hits = sum(m.cache_hits for m in self.metrics)
total_requests = total_hits + sum(m.cache_misses for m in self.metrics)
return (total_hits / total_requests * 100) if total_requests > 0 else 0.0
def _get_slowest_operations(self, count: int) -> List[Dict[str, Any]]:
"""Get the slowest operations."""
sorted_metrics = sorted(self.metrics, key=lambda m: m.duration, reverse=True)
return [
{
'operation': m.operation,
'duration': m.duration,
'query_count': m.query_count,
'timestamp': m.timestamp.isoformat()
}
for m in sorted_metrics[:count]
]
def _get_most_query_intensive(self, count: int) -> List[Dict[str, Any]]:
"""Get operations with the most database queries."""
sorted_metrics = sorted(self.metrics, key=lambda m: m.query_count, reverse=True)
return [
{
'operation': m.operation,
'query_count': m.query_count,
'duration': m.duration,
'timestamp': m.timestamp.isoformat()
}
for m in sorted_metrics[:count]
]
class BenchmarkSuite:
"""
Comprehensive benchmarking suite for park listing performance.
"""
def __init__(self):
self.monitor = PerformanceMonitor()
self.factory = RequestFactory()
def run_autocomplete_benchmark(self, queries: List[str] = None) -> Dict[str, Any]:
"""Benchmark autocomplete performance with various queries."""
if not queries:
queries = [
'Di', # Short query
'Disney', # Common brand
'Universal', # Another common brand
'Cedar Point', # Specific park
'California', # Location
'Roller', # Generic term
'Xyz123' # Non-existent query
]
results = []
for query in queries:
with self.monitor.measure_operation(f"autocomplete_{query}", query=query):
# Simulate autocomplete request
from apps.parks.views_autocomplete import ParkAutocompleteView
request = self.factory.get(f'/api/parks/autocomplete/?q={query}')
view = ParkAutocompleteView()
response = view.get(request)
results.append({
'query': query,
'status_code': response.status_code,
'response_time': self.monitor.metrics[-1].duration,
'query_count': self.monitor.metrics[-1].query_count
})
return {
'benchmark_type': 'autocomplete',
'queries_tested': len(queries),
'results': results,
'summary': self.monitor.get_performance_summary()
}
def run_listing_benchmark(self, scenarios: List[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Benchmark park listing performance with various filter scenarios."""
if not scenarios:
scenarios = [
{'name': 'no_filters', 'params': {}},
{'name': 'status_filter', 'params': {'status': 'OPERATING'}},
{'name': 'operator_filter', 'params': {'operator': 'Disney'}},
{'name': 'location_filter', 'params': {'country': 'United States'}},
{'name': 'complex_filter', 'params': {
'status': 'OPERATING',
'has_coasters': 'true',
'min_rating': '4.0'
}},
{'name': 'search_query', 'params': {'search': 'Magic Kingdom'}},
{'name': 'pagination_last_page', 'params': {'page': '10'}}
]
results = []
for scenario in scenarios:
with self.monitor.measure_operation(f"listing_{scenario['name']}", **scenario['params']):
# Simulate listing request
from apps.parks.views import ParkListView
query_string = '&'.join([f"{k}={v}" for k, v in scenario['params'].items()])
request = self.factory.get(f'/parks/?{query_string}')
view = ParkListView()
view.setup(request)
# Simulate getting the queryset and context
queryset = view.get_queryset()
context = view.get_context_data()
results.append({
'scenario': scenario['name'],
'params': scenario['params'],
'result_count': queryset.count() if hasattr(queryset, 'count') else len(queryset),
'response_time': self.monitor.metrics[-1].duration,
'query_count': self.monitor.metrics[-1].query_count
})
return {
'benchmark_type': 'listing',
'scenarios_tested': len(scenarios),
'results': results,
'summary': self.monitor.get_performance_summary()
}
def run_pagination_benchmark(self, page_sizes: List[int] = None, page_numbers: List[int] = None) -> Dict[str, Any]:
"""Benchmark pagination performance with different page sizes and numbers."""
if not page_sizes:
page_sizes = [10, 20, 50, 100]
if not page_numbers:
page_numbers = [1, 5, 10, 50]
results = []
for page_size in page_sizes:
for page_number in page_numbers:
scenario_name = f"page_{page_number}_size_{page_size}"
with self.monitor.measure_operation(scenario_name, page_size=page_size, page_number=page_number):
from apps.parks.services.pagination_service import get_optimized_page
from apps.parks.querysets import get_base_park_queryset
queryset = get_base_park_queryset()
page, metadata = get_optimized_page(queryset, page_number, page_size)
results.append({
'page_size': page_size,
'page_number': page_number,
'total_count': metadata.get('total_count', 0),
'response_time': self.monitor.metrics[-1].duration,
'query_count': self.monitor.metrics[-1].query_count
})
return {
'benchmark_type': 'pagination',
'configurations_tested': len(results),
'results': results,
'summary': self.monitor.get_performance_summary()
}
def run_full_benchmark_suite(self) -> Dict[str, Any]:
"""Run the complete benchmark suite."""
logger.info("Starting comprehensive benchmark suite")
suite_start = time.perf_counter()
# Run all benchmarks
autocomplete_results = self.run_autocomplete_benchmark()
listing_results = self.run_listing_benchmark()
pagination_results = self.run_pagination_benchmark()
suite_duration = time.perf_counter() - suite_start
# Generate comprehensive report
report = {
'benchmark_suite': 'Park Listing Performance',
'timestamp': datetime.now().isoformat(),
'total_duration': suite_duration,
'autocomplete': autocomplete_results,
'listing': listing_results,
'pagination': pagination_results,
'overall_summary': self.monitor.get_performance_summary(),
'recommendations': self._generate_recommendations()
}
# Save report
self._save_benchmark_report(report)
logger.info(f"Benchmark suite completed in {suite_duration:.3f}s")
return report
def _generate_recommendations(self) -> List[str]:
"""Generate performance recommendations based on benchmark results."""
recommendations = []
summary = self.monitor.get_performance_summary()
# Check average response times
if summary['duration_stats']['mean'] > 0.5:
recommendations.append("Average response time is high (>500ms). Consider implementing additional caching.")
# Check query counts
if summary['query_stats']['mean'] > 5:
recommendations.append("High average query count. Review and optimize database queries.")
# Check cache hit rate
if summary['cache_stats']['hit_rate'] < 80:
recommendations.append("Cache hit rate is low (<80%). Increase cache timeouts or improve cache key strategy.")
# Check for slow operations
slowest = summary.get('slowest_operations', [])
if slowest and slowest[0]['duration'] > 2.0:
recommendations.append(f"Slowest operation ({slowest[0]['operation']}) is very slow (>{slowest[0]['duration']:.2f}s).")
if not recommendations:
recommendations.append("Performance appears to be within acceptable ranges.")
return recommendations
def _save_benchmark_report(self, report: Dict[str, Any]):
"""Save benchmark report to file and cache."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"benchmark_report_{timestamp}.json"
try:
# Save to logs directory
import os
logs_dir = "logs"
os.makedirs(logs_dir, exist_ok=True)
filepath = os.path.join(logs_dir, filename)
with open(filepath, 'w') as f:
json.dump(report, f, indent=2, default=str)
logger.info(f"Benchmark report saved to {filepath}")
# Also cache the report
cache.set(f"benchmark_report_latest", report, 3600) # 1 hour
except Exception as e:
logger.error(f"Error saving benchmark report: {e}")
# Global performance monitor instance
performance_monitor = PerformanceMonitor()
def benchmark_operation(operation_name: str):
"""Decorator to benchmark a function."""
def decorator(func: Callable):
def wrapper(*args, **kwargs):
with performance_monitor.measure_operation(operation_name):
return func(*args, **kwargs)
return wrapper
return decorator
# Convenience function to run benchmarks
def run_performance_benchmark():
"""Run the complete performance benchmark suite."""
suite = BenchmarkSuite()
return suite.run_full_benchmark_suite()