mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-20 10:11:09 -05:00
Implement performance enhancements for park listing by optimizing database queries, introducing efficient caching mechanisms, and refining pagination for a significantly faster and smoother user experience. Replit-Commit-Author: Agent Replit-Commit-Session-Id: c446bc9e-66df-438c-a86c-f53e6da13649 Replit-Commit-Checkpoint-Type: intermediate_checkpoint
45 lines
12 KiB
Python
45 lines
12 KiB
Python
"""
|
|
Strategic caching service for park listings and performance optimization.
|
|
"""
|
|
|
|
import hashlib
|
|
import logging
|
|
import time
|
|
from typing import Dict, Any, List, Optional, Union
|
|
from django.core.cache import cache
|
|
from django.db.models import QuerySet
|
|
from django.conf import settings
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
|
|
logger = logging.getLogger("cache_service")
|
|
|
|
|
|
class CacheService:
|
|
"""
|
|
Centralized caching service for park-related data with intelligent invalidation.
|
|
"""
|
|
|
|
# Cache prefixes for different data types
|
|
FILTER_COUNTS = "park_filter_counts"
|
|
AUTOCOMPLETE = "park_autocomplete"
|
|
SEARCH_RESULTS = "park_search"
|
|
CLOUDFLARE_IMAGES = "cf_images"
|
|
PARK_STATS = "park_stats"
|
|
PAGINATED_RESULTS = "park_paginated"
|
|
|
|
# Default cache timeouts (in seconds)
|
|
TIMEOUTS = {
|
|
FILTER_COUNTS: 900, # 15 minutes - changes rarely
|
|
AUTOCOMPLETE: 300, # 5 minutes - relatively stable
|
|
SEARCH_RESULTS: 600, # 10 minutes - moderate frequency
|
|
CLOUDFLARE_IMAGES: 3600, # 1 hour - very stable
|
|
PARK_STATS: 1800, # 30 minutes - updated periodically
|
|
PAGINATED_RESULTS: 300, # 5 minutes - user-specific
|
|
}
|
|
|
|
@classmethod
|
|
def _generate_cache_key(cls, prefix: str, *args, **kwargs) -> str:\n \"\"\"Generate a consistent cache key from parameters.\"\"\"\n # Create a string representation of all arguments\n key_parts = [prefix]\n key_parts.extend(str(arg) for arg in args)\n \n # Add sorted kwargs for consistency\n if kwargs:\n sorted_kwargs = sorted(kwargs.items())\n key_parts.append(hashlib.md5(\n str(sorted_kwargs).encode('utf-8')\n ).hexdigest()[:8])\n \n return \":\".join(key_parts)\n \n @classmethod\n def get_cached_filter_counts(cls, filters: Dict[str, Any] = None) -> Optional[Dict[str, Any]]:\n \"\"\"Get cached filter counts.\"\"\"\n cache_key = cls._generate_cache_key(cls.FILTER_COUNTS, filters or {})\n result = cache.get(cache_key)\n \n if result:\n logger.debug(f\"Cache hit for filter counts: {cache_key}\")\n \n return result\n \n @classmethod\n def cache_filter_counts(cls, counts: Dict[str, Any], filters: Dict[str, Any] = None) -> None:\n \"\"\"Cache filter counts with automatic timeout.\"\"\"\n cache_key = cls._generate_cache_key(cls.FILTER_COUNTS, filters or {})\n timeout = cls.TIMEOUTS[cls.FILTER_COUNTS]\n \n cache.set(cache_key, counts, timeout)\n logger.debug(f\"Cached filter counts: {cache_key} for {timeout}s\")\n \n @classmethod\n def get_cached_autocomplete(cls, query: str) -> Optional[List[Dict[str, Any]]]:\n \"\"\"Get cached autocomplete suggestions.\"\"\"\n cache_key = cls._generate_cache_key(cls.AUTOCOMPLETE, query.lower().strip())\n result = cache.get(cache_key)\n \n if result:\n logger.debug(f\"Cache hit for autocomplete: {cache_key}\")\n \n return result\n \n @classmethod\n def cache_autocomplete(cls, query: str, suggestions: List[Dict[str, Any]]) -> None:\n \"\"\"Cache autocomplete suggestions.\"\"\"\n cache_key = cls._generate_cache_key(cls.AUTOCOMPLETE, query.lower().strip())\n timeout = cls.TIMEOUTS[cls.AUTOCOMPLETE]\n \n cache.set(cache_key, suggestions, timeout)\n logger.debug(f\"Cached autocomplete: {cache_key} for {timeout}s\")\n \n @classmethod\n def get_cached_search_results(cls, query: str, filters: Dict[str, Any]) -> Optional[List[int]]:\n \"\"\"Get cached search result IDs.\"\"\"\n cache_key = cls._generate_cache_key(cls.SEARCH_RESULTS, query, **filters)\n result = cache.get(cache_key)\n \n if result:\n logger.debug(f\"Cache hit for search results: {cache_key}\")\n \n return result\n \n @classmethod\n def cache_search_results(cls, query: str, filters: Dict[str, Any], result_ids: List[int]) -> None:\n \"\"\"Cache search result IDs.\"\"\"\n cache_key = cls._generate_cache_key(cls.SEARCH_RESULTS, query, **filters)\n timeout = cls.TIMEOUTS[cls.SEARCH_RESULTS]\n \n cache.set(cache_key, result_ids, timeout)\n logger.debug(f\"Cached search results: {cache_key} for {timeout}s\")\n \n @classmethod\n def get_cached_cloudflare_image(cls, image_id: str, variant: str = \"public\") -> Optional[str]:\n \"\"\"Get cached CloudFlare image URL.\"\"\"\n cache_key = cls._generate_cache_key(cls.CLOUDFLARE_IMAGES, image_id, variant)\n result = cache.get(cache_key)\n \n if result:\n logger.debug(f\"Cache hit for CloudFlare image: {cache_key}\")\n \n return result\n \n @classmethod\n def cache_cloudflare_image(cls, image_id: str, variant: str, url: str) -> None:\n \"\"\"Cache CloudFlare image URL.\"\"\"\n cache_key = cls._generate_cache_key(cls.CLOUDFLARE_IMAGES, image_id, variant)\n timeout = cls.TIMEOUTS[cls.CLOUDFLARE_IMAGES]\n \n cache.set(cache_key, url, timeout)\n logger.debug(f\"Cached CloudFlare image: {cache_key} for {timeout}s\")\n \n @classmethod\n def get_cached_park_stats(cls, stat_type: str) -> Optional[Dict[str, Any]]:\n \"\"\"Get cached park statistics.\"\"\"\n cache_key = cls._generate_cache_key(cls.PARK_STATS, stat_type)\n result = cache.get(cache_key)\n \n if result:\n logger.debug(f\"Cache hit for park stats: {cache_key}\")\n \n return result\n \n @classmethod\n def cache_park_stats(cls, stat_type: str, stats: Dict[str, Any]) -> None:\n \"\"\"Cache park statistics.\"\"\"\n cache_key = cls._generate_cache_key(cls.PARK_STATS, stat_type)\n timeout = cls.TIMEOUTS[cls.PARK_STATS]\n \n # Add timestamp for data freshness tracking\n stats_with_meta = {\n 'data': stats,\n 'cached_at': timezone.now().isoformat(),\n 'ttl': timeout\n }\n \n cache.set(cache_key, stats_with_meta, timeout)\n logger.debug(f\"Cached park stats: {cache_key} for {timeout}s\")\n \n @classmethod\n def invalidate_related_caches(cls, model_name: str, instance_id: Optional[int] = None) -> None:\n \"\"\"Invalidate caches related to a model change.\"\"\"\n invalidation_map = {\n 'park': [cls.FILTER_COUNTS, cls.SEARCH_RESULTS, cls.PARK_STATS, cls.AUTOCOMPLETE],\n 'company': [cls.FILTER_COUNTS, cls.AUTOCOMPLETE],\n 'parklocation': [cls.SEARCH_RESULTS, cls.FILTER_COUNTS],\n 'parkphoto': [cls.CLOUDFLARE_IMAGES],\n }\n \n prefixes_to_invalidate = invalidation_map.get(model_name.lower(), [])\n \n for prefix in prefixes_to_invalidate:\n cls._invalidate_by_prefix(prefix)\n logger.info(f\"Invalidated cache prefix: {prefix} due to {model_name} change\")\n \n @classmethod\n def _invalidate_by_prefix(cls, prefix: str) -> None:\n \"\"\"Invalidate all cache keys with a given prefix.\"\"\"\n # This is a simplified implementation\n # In production, you'd want to use Redis SCAN or similar for efficiency\n try:\n if hasattr(cache, 'delete_pattern'):\n # Redis backend with pattern support\n deleted_count = cache.delete_pattern(f\"{prefix}:*\")\n logger.debug(f\"Deleted {deleted_count} cache keys with prefix {prefix}\")\n else:\n # Fallback: let TTL handle expiration\n logger.warning(f\"Cache backend doesn't support pattern deletion for prefix {prefix}\")\n except Exception as e:\n logger.error(f\"Error invalidating cache prefix {prefix}: {e}\")\n \n @classmethod\n def warm_cache(cls) -> None:\n \"\"\"Pre-warm frequently accessed caches.\"\"\"\n logger.info(\"Starting cache warm-up process\")\n \n try:\n # Warm up filter counts\n from apps.parks.services.filter_service import ParkFilterService\n filter_service = ParkFilterService()\n filter_counts = filter_service.get_filter_counts()\n cls.cache_filter_counts(filter_counts)\n \n # Warm up common autocomplete queries\n common_queries = ['Disney', 'Universal', 'Six Flags', 'Cedar']\n for query in common_queries:\n # This would trigger the autocomplete cache\n pass\n \n # Warm up park statistics\n park_stats = {\n 'total_parks': filter_counts.get('total_parks', 0),\n 'operating_parks': filter_counts.get('operating_parks', 0),\n 'last_updated': timezone.now().isoformat()\n }\n cls.cache_park_stats('overview', park_stats)\n \n logger.info(\"Cache warm-up completed successfully\")\n \n except Exception as e:\n logger.error(f\"Error during cache warm-up: {e}\")\n \n @classmethod\n def get_cache_stats(cls) -> Dict[str, Any]:\n \"\"\"Get cache performance statistics.\"\"\"\n stats = {\n 'cache_backend': cache.__class__.__name__,\n 'configured_timeouts': cls.TIMEOUTS,\n 'current_time': timezone.now().isoformat()\n }\n \n # Try to get backend-specific stats if available\n try:\n if hasattr(cache, '_cache') and hasattr(cache._cache, 'info'):\n # Redis backend\n redis_info = cache._cache.info()\n stats['redis_info'] = {\n 'used_memory': redis_info.get('used_memory_human'),\n 'connected_clients': redis_info.get('connected_clients'),\n 'total_commands_processed': redis_info.get('total_commands_processed')\n }\n except Exception as e:\n logger.debug(f\"Could not get cache backend stats: {e}\")\n \n return stats
|
|
|
|
|
|
class CloudFlareImageCache:\n \"\"\"Specialized caching for CloudFlare Images with URL management.\"\"\"\n \n @classmethod\n def get_optimized_image_url(cls, image_id: str, variant: str = \"public\", width: Optional[int] = None) -> str:\n \"\"\"Get optimized CloudFlare image URL with caching.\"\"\"\n # Create cache key including variant and width\n cache_params = {'variant': variant}\n if width:\n cache_params['width'] = width\n \n cached_url = CacheService.get_cached_cloudflare_image(\n image_id, \n f\"{variant}_{width}\" if width else variant\n )\n \n if cached_url:\n return cached_url\n \n # Build URL (this would integrate with your CloudFlare Images setup)\n base_url = getattr(settings, 'CLOUDFLARE_IMAGES_BASE_URL', '')\n \n if width:\n # Use CloudFlare's resizing capabilities\n url = f\"{base_url}/{image_id}/w={width}\"\n else:\n url = f\"{base_url}/{image_id}/{variant}\"\n \n # Cache the URL\n CacheService.cache_cloudflare_image(\n image_id, \n f\"{variant}_{width}\" if width else variant, \n url\n )\n \n return url\n \n @classmethod\n def preload_image_urls(cls, image_ids: List[str], variants: List[str] = None) -> None:\n \"\"\"Preload image URLs to warm the cache.\"\"\"\n if not variants:\n variants = ['public', 'thumbnail']\n \n for image_id in image_ids:\n for variant in variants:\n cls.get_optimized_image_url(image_id, variant)\n \n logger.info(f\"Preloaded {len(image_ids)} image URLs with {len(variants)} variants\")\n |