Add OWASP compliance mapping and security test case templates, and document version control implementation phases

This commit is contained in:
pacnpal
2025-02-07 10:51:11 -05:00
parent 2c82489691
commit c083f54afb
38 changed files with 5313 additions and 94 deletions

223
history_tracking/caching.py Normal file
View File

@@ -0,0 +1,223 @@
from django.core.cache import cache
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from typing import Optional, List, Dict, Any
import hashlib
import json
import logging
logger = logging.getLogger('version_control')
class VersionHistoryCache:
"""
Caching system for version control history data.
Implements a multi-level caching strategy with memory and persistent storage.
"""
# Cache key prefixes
BRANCH_PREFIX = 'vc_branch_'
CHANGE_PREFIX = 'vc_change_'
HISTORY_PREFIX = 'vc_history_'
# Cache durations (in seconds)
BRANCH_CACHE_DURATION = 3600 # 1 hour
CHANGE_CACHE_DURATION = 1800 # 30 minutes
HISTORY_CACHE_DURATION = 3600 * 24 # 24 hours
@classmethod
def get_branch_key(cls, branch_id: int) -> str:
"""Generate cache key for branch data"""
return f"{cls.BRANCH_PREFIX}{branch_id}"
@classmethod
def get_change_key(cls, change_id: int) -> str:
"""Generate cache key for change data"""
return f"{cls.CHANGE_PREFIX}{change_id}"
@classmethod
def get_history_key(cls, content_type_id: int, object_id: int) -> str:
"""Generate cache key for object history"""
return f"{cls.HISTORY_PREFIX}{content_type_id}_{object_id}"
@classmethod
def generate_cache_version(cls, data: Dict[str, Any]) -> str:
"""Generate version hash for cache invalidation"""
data_str = json.dumps(data, sort_keys=True)
return hashlib.md5(data_str.encode()).hexdigest()
@classmethod
def cache_branch(cls, branch_data: Dict[str, Any]) -> None:
"""Cache branch data with versioning"""
key = cls.get_branch_key(branch_data['id'])
version = cls.generate_cache_version(branch_data)
cache_data = {
'data': branch_data,
'version': version,
'timestamp': settings.VERSION_CONTROL_TIMESTAMP
}
try:
cache.set(key, cache_data, cls.BRANCH_CACHE_DURATION)
logger.debug(f"Cached branch data: {key}")
except Exception as e:
logger.error(f"Error caching branch data: {e}")
@classmethod
def get_cached_branch(cls, branch_id: int) -> Optional[Dict[str, Any]]:
"""Retrieve cached branch data if valid"""
key = cls.get_branch_key(branch_id)
cache_data = cache.get(key)
if cache_data:
# Validate cache version and timestamp
if (
cache_data.get('timestamp') == settings.VERSION_CONTROL_TIMESTAMP and
cls.generate_cache_version(cache_data['data']) == cache_data['version']
):
logger.debug(f"Cache hit for branch: {key}")
return cache_data['data']
# Invalid cache, delete it
cache.delete(key)
logger.debug(f"Invalidated branch cache: {key}")
return None
@classmethod
def cache_change(cls, change_data: Dict[str, Any]) -> None:
"""Cache change data"""
key = cls.get_change_key(change_data['id'])
version = cls.generate_cache_version(change_data)
cache_data = {
'data': change_data,
'version': version,
'timestamp': settings.VERSION_CONTROL_TIMESTAMP
}
try:
cache.set(key, cache_data, cls.CHANGE_CACHE_DURATION)
logger.debug(f"Cached change data: {key}")
except Exception as e:
logger.error(f"Error caching change data: {e}")
@classmethod
def get_cached_change(cls, change_id: int) -> Optional[Dict[str, Any]]:
"""Retrieve cached change data if valid"""
key = cls.get_change_key(change_id)
cache_data = cache.get(key)
if cache_data:
if (
cache_data.get('timestamp') == settings.VERSION_CONTROL_TIMESTAMP and
cls.generate_cache_version(cache_data['data']) == cache_data['version']
):
logger.debug(f"Cache hit for change: {key}")
return cache_data['data']
cache.delete(key)
logger.debug(f"Invalidated change cache: {key}")
return None
@classmethod
def cache_history(cls, content_type_id: int, object_id: int, history_data: List[Dict[str, Any]]) -> None:
"""Cache version history for an object"""
key = cls.get_history_key(content_type_id, object_id)
version = cls.generate_cache_version({'history': history_data})
cache_data = {
'data': history_data,
'version': version,
'timestamp': settings.VERSION_CONTROL_TIMESTAMP
}
try:
cache.set(key, cache_data, cls.HISTORY_CACHE_DURATION)
logger.debug(f"Cached history data: {key}")
except Exception as e:
logger.error(f"Error caching history data: {e}")
@classmethod
def get_cached_history(cls, content_type_id: int, object_id: int) -> Optional[List[Dict[str, Any]]]:
"""Retrieve cached history data if valid"""
key = cls.get_history_key(content_type_id, object_id)
cache_data = cache.get(key)
if cache_data:
if (
cache_data.get('timestamp') == settings.VERSION_CONTROL_TIMESTAMP and
cls.generate_cache_version({'history': cache_data['data']}) == cache_data['version']
):
logger.debug(f"Cache hit for history: {key}")
return cache_data['data']
cache.delete(key)
logger.debug(f"Invalidated history cache: {key}")
return None
@classmethod
def invalidate_branch(cls, branch_id: int) -> None:
"""Invalidate branch cache"""
key = cls.get_branch_key(branch_id)
cache.delete(key)
logger.debug(f"Manually invalidated branch cache: {key}")
@classmethod
def invalidate_change(cls, change_id: int) -> None:
"""Invalidate change cache"""
key = cls.get_change_key(change_id)
cache.delete(key)
logger.debug(f"Manually invalidated change cache: {key}")
@classmethod
def invalidate_history(cls, content_type_id: int, object_id: int) -> None:
"""Invalidate history cache"""
key = cls.get_history_key(content_type_id, object_id)
cache.delete(key)
logger.debug(f"Manually invalidated history cache: {key}")
@classmethod
def invalidate_all(cls) -> None:
"""Invalidate all version control caches"""
try:
# Get all keys with our prefixes
keys = []
for prefix in [cls.BRANCH_PREFIX, cls.CHANGE_PREFIX, cls.HISTORY_PREFIX]:
keys.extend(cache.keys(f"{prefix}*"))
# Delete all matching keys
cache.delete_many(keys)
logger.info(f"Invalidated {len(keys)} version control cache entries")
except Exception as e:
logger.error(f"Error invalidating all caches: {e}")
class CacheableVersionMixin:
"""Mixin to add caching capabilities to version control models"""
def cache_data(self) -> None:
"""Cache the object's data"""
if hasattr(self, 'to_dict'):
data = self.to_dict()
if hasattr(self, 'branch_id'):
VersionHistoryCache.cache_branch(data)
elif hasattr(self, 'change_id'):
VersionHistoryCache.cache_change(data)
def invalidate_cache(self) -> None:
"""Invalidate the object's cache"""
if hasattr(self, 'branch_id'):
VersionHistoryCache.invalidate_branch(self.branch_id)
elif hasattr(self, 'change_id'):
VersionHistoryCache.invalidate_change(self.change_id)
def invalidate_related_caches(self) -> None:
"""Invalidate related object caches"""
if hasattr(self, 'content_type_id') and hasattr(self, 'object_id'):
VersionHistoryCache.invalidate_history(
self.content_type_id,
self.object_id
)