""" Optimized pagination service for large datasets with efficient counting. """ from typing import Dict, Any, Optional, Tuple from django.core.paginator import Paginator, Page from django.core.cache import cache from django.db.models import QuerySet, Count from django.conf import settings import hashlib import time import logging logger = logging.getLogger("pagination_service") class OptimizedPaginator(Paginator): """ Custom paginator that optimizes COUNT queries and provides caching. """ def __init__(self, object_list, per_page, cache_timeout=300, **kwargs): super().__init__(object_list, per_page, **kwargs) self.cache_timeout = cache_timeout self._cached_count = None self._count_cache_key = None def _get_count_cache_key(self) -> str: """Generate cache key for count based on queryset SQL.""" if self._count_cache_key: return self._count_cache_key # Create cache key from queryset SQL if hasattr(self.object_list, 'query'): sql_hash = hashlib.md5( str(self.object_list.query).encode('utf-8') ).hexdigest()[:16] self._count_cache_key = f"paginator_count:{sql_hash}" else: # Fallback for non-queryset object lists self._count_cache_key = f"paginator_count:list:{len(self.object_list)}" return self._count_cache_key @property def count(self): """ Optimized count with caching for expensive querysets. """ if self._cached_count is not None: return self._cached_count cache_key = self._get_count_cache_key() cached_count = cache.get(cache_key) if cached_count is not None: logger.debug(f"Cache hit for pagination count: {cache_key}") self._cached_count = cached_count return cached_count # Perform optimized count start_time = time.time() if hasattr(self.object_list, 'count'): # For QuerySets, try to optimize the count query count = self._get_optimized_count() else: count = len(self.object_list) execution_time = time.time() - start_time # Cache the result cache.set(cache_key, count, self.cache_timeout) self._cached_count = count if execution_time > 0.5: # Log slow count queries logger.warning( f"Slow pagination count query: {execution_time:.3f}s for {count} items", extra={'cache_key': cache_key, 'execution_time': execution_time} ) return count def _get_optimized_count(self) -> int: """ Get optimized count for complex querysets. """ queryset = self.object_list # For complex queries with joins, use approximate counting for very large datasets if self._is_complex_query(queryset): # Try to get count from a simpler subquery try: # Use subquery approach for complex queries subquery = queryset.values('pk') return subquery.count() except Exception as e: logger.warning(f"Optimized count failed, falling back to standard count: {e}") return queryset.count() else: return queryset.count() def _is_complex_query(self, queryset) -> bool: """ Determine if a queryset is complex and might benefit from optimization. """ if not hasattr(queryset, 'query'): return False sql = str(queryset.query).upper() # Consider complex if it has multiple joins or subqueries complexity_indicators = [ 'JOIN' in sql and sql.count('JOIN') > 2, 'DISTINCT' in sql, 'GROUP BY' in sql, 'HAVING' in sql, ] return any(complexity_indicators) class CursorPaginator: """ Cursor-based pagination for very large datasets. More efficient than offset-based pagination for large page numbers. """ def __init__(self, queryset: QuerySet, ordering_field: str = 'id', per_page: int = 20): self.queryset = queryset self.ordering_field = ordering_field self.per_page = per_page self.reverse = ordering_field.startswith('-') self.field_name = ordering_field.lstrip('-') def get_page(self, cursor: Optional[str] = None) -> Dict[str, Any]: """ Get a page of results using cursor-based pagination. Args: cursor: Base64 encoded cursor value from previous page Returns: Dictionary with page data and navigation cursors """ queryset = self.queryset.order_by(self.ordering_field) if cursor: # Decode cursor and filter from that point try: cursor_value = self._decode_cursor(cursor) if self.reverse: queryset = queryset.filter(**{f"{self.field_name}__lt": cursor_value}) else: queryset = queryset.filter(**{f"{self.field_name}__gt": cursor_value}) except (ValueError, TypeError): # Invalid cursor, start from beginning pass # Get one extra item to check if there's a next page items = list(queryset[:self.per_page + 1]) has_next = len(items) > self.per_page if has_next: items = items[:-1] # Remove the extra item # Generate cursors for navigation next_cursor = None previous_cursor = None if items and has_next: last_item = items[-1] next_cursor = self._encode_cursor(getattr(last_item, self.field_name)) if items and cursor: first_item = items[0] previous_cursor = self._encode_cursor(getattr(first_item, self.field_name)) return { 'items': items, 'has_next': has_next, 'has_previous': cursor is not None, 'next_cursor': next_cursor, 'previous_cursor': previous_cursor, 'count': len(items) } def _encode_cursor(self, value) -> str: """Encode cursor value to base64 string.""" import base64 return base64.b64encode(str(value).encode()).decode() def _decode_cursor(self, cursor: str): """Decode cursor from base64 string.""" import base64 decoded = base64.b64decode(cursor.encode()).decode() # Try to convert to appropriate type based on field field = self.queryset.model._meta.get_field(self.field_name) if hasattr(field, 'to_python'): return field.to_python(decoded) return decoded class PaginationCache: """ Advanced caching for pagination metadata and results. """ CACHE_PREFIX = "pagination" DEFAULT_TIMEOUT = 300 # 5 minutes @classmethod def get_page_cache_key(cls, queryset_hash: str, page_num: int) -> str: """Generate cache key for a specific page.""" return f"{cls.CACHE_PREFIX}:page:{queryset_hash}:{page_num}" @classmethod def get_metadata_cache_key(cls, queryset_hash: str) -> str: """Generate cache key for pagination metadata.""" return f"{cls.CACHE_PREFIX}:meta:{queryset_hash}" @classmethod def cache_page_results( cls, queryset_hash: str, page_num: int, page_data: Dict[str, Any], timeout: int = DEFAULT_TIMEOUT ): """Cache page results.""" cache_key = cls.get_page_cache_key(queryset_hash, page_num) cache.set(cache_key, page_data, timeout) @classmethod def get_cached_page(cls, queryset_hash: str, page_num: int) -> Optional[Dict[str, Any]]: """Get cached page results.""" cache_key = cls.get_page_cache_key(queryset_hash, page_num) return cache.get(cache_key) @classmethod def cache_metadata( cls, queryset_hash: str, metadata: Dict[str, Any], timeout: int = DEFAULT_TIMEOUT ): """Cache pagination metadata.""" cache_key = cls.get_metadata_cache_key(queryset_hash) cache.set(cache_key, metadata, timeout) @classmethod def get_cached_metadata(cls, queryset_hash: str) -> Optional[Dict[str, Any]]: """Get cached pagination metadata.""" cache_key = cls.get_metadata_cache_key(queryset_hash) return cache.get(cache_key) @classmethod def invalidate_cache(cls, queryset_hash: str): """Invalidate all cache entries for a queryset.""" # This would require a cache backend that supports pattern deletion # For now, we'll rely on TTL expiration pass def get_optimized_page( queryset: QuerySet, page_number: int, per_page: int = 20, use_cursor: bool = False, cursor: Optional[str] = None, cache_timeout: int = 300 ) -> Tuple[Page, Dict[str, Any]]: """ Get an optimized page with caching and performance monitoring. Args: queryset: The queryset to paginate page_number: Page number to retrieve per_page: Items per page use_cursor: Whether to use cursor-based pagination cursor: Cursor for cursor-based pagination cache_timeout: Cache timeout in seconds Returns: Tuple of (Page object, metadata dict) """ if use_cursor: paginator = CursorPaginator(queryset, per_page=per_page) page_data = paginator.get_page(cursor) return page_data, { 'pagination_type': 'cursor', 'has_next': page_data['has_next'], 'has_previous': page_data['has_previous'], 'next_cursor': page_data['next_cursor'], 'previous_cursor': page_data['previous_cursor'] } else: paginator = OptimizedPaginator(queryset, per_page, cache_timeout=cache_timeout) page = paginator.get_page(page_number) return page, { 'pagination_type': 'offset', 'total_pages': paginator.num_pages, 'total_count': paginator.count, 'has_next': page.has_next(), 'has_previous': page.has_previous(), 'current_page': page.number }