Add secret management guide, client-side performance monitoring, and search accessibility enhancements

- Introduced a comprehensive Secret Management Guide detailing best practices, secret classification, development setup, production management, rotation procedures, and emergency protocols.
- Implemented a client-side performance monitoring script to track various metrics including page load performance, paint metrics, layout shifts, and memory usage.
- Enhanced search accessibility with keyboard navigation support for search results, ensuring compliance with WCAG standards and improving user experience.
This commit is contained in:
pacnpal
2025-12-23 16:41:42 -05:00
parent ae31e889d7
commit edcd8f2076
155 changed files with 22046 additions and 4645 deletions

View File

@@ -1,30 +1,154 @@
"""
Django admin configuration for the Core application.
This module provides admin interfaces for core models including
slug history for SEO redirect management.
Performance targets:
- List views: < 8 queries
- Page load time: < 500ms for 100 records
"""
from django.contrib import admin
from django.utils.html import format_html
from apps.core.admin.base import BaseModelAdmin
from apps.core.admin.mixins import (
ExportActionMixin,
QueryOptimizationMixin,
ReadOnlyAdminMixin,
)
from .models import SlugHistory
@admin.register(SlugHistory)
class SlugHistoryAdmin(admin.ModelAdmin):
list_display = ["content_object_link", "old_slug", "created_at"]
list_filter = ["content_type", "created_at"]
search_fields = ["old_slug", "object_id"]
readonly_fields = ["content_type", "object_id", "old_slug", "created_at"]
class SlugHistoryAdmin(
ReadOnlyAdminMixin, QueryOptimizationMixin, ExportActionMixin, BaseModelAdmin
):
"""
Admin interface for SlugHistory management.
Read-only admin for viewing slug history records used for
SEO redirects. Records are automatically created when slugs
change and should not be manually modified.
Query optimizations:
- select_related: content_type
- prefetch_related: content_object (where applicable)
"""
list_display = (
"content_object_link",
"old_slug",
"content_type_display",
"created_at",
)
list_filter = ("content_type", "created_at")
list_select_related = ["content_type"]
search_fields = ("old_slug", "object_id")
readonly_fields = ("content_type", "object_id", "old_slug", "created_at")
date_hierarchy = "created_at"
ordering = ["-created_at"]
ordering = ("-created_at",)
export_fields = ["id", "content_type", "object_id", "old_slug", "created_at"]
export_filename_prefix = "slug_history"
fieldsets = (
(
"Slug Information",
{
"fields": ("old_slug",),
"description": "The previous slug value that should redirect to the current URL.",
},
),
(
"Related Object",
{
"fields": ("content_type", "object_id"),
"description": "The object this slug history belongs to.",
},
),
(
"Metadata",
{
"fields": ("created_at",),
"classes": ("collapse",),
"description": "When this slug history record was created.",
},
),
)
@admin.display(description="Object")
def content_object_link(self, obj):
"""Create a link to the related object's admin page"""
"""Create a link to the related object's admin page."""
try:
url = obj.content_object.get_absolute_url()
return format_html('<a href="{}">{}</a>', url, str(obj.content_object))
except (AttributeError, ValueError):
return str(obj.content_object)
content_obj = obj.content_object
if content_obj:
# Try to get admin URL
from django.urls import reverse
def has_add_permission(self, request):
"""Disable manual creation of slug history records"""
return False
app_label = obj.content_type.app_label
model_name = obj.content_type.model
try:
url = reverse(
f"admin:{app_label}_{model_name}_change",
args=[content_obj.pk],
)
return format_html(
'<a href="{}">{}</a>',
url,
str(content_obj)[:50],
)
except Exception:
# Fall back to object's absolute URL if available
if hasattr(content_obj, "get_absolute_url"):
return format_html(
'<a href="{}">{}</a>',
content_obj.get_absolute_url(),
str(content_obj)[:50],
)
return str(content_obj)[:50] if content_obj else "-"
except Exception:
return format_html('<span style="color: red;">Object not found</span>')
def has_change_permission(self, request, obj=None):
"""Disable editing of slug history records"""
return False
@admin.display(description="Type")
def content_type_display(self, obj):
"""Display content type in a readable format."""
if obj.content_type:
return f"{obj.content_type.app_label}.{obj.content_type.model}"
return "-"
@admin.action(description="Export for SEO redirects")
def export_for_seo(self, request, queryset):
"""Export slug history as SEO redirect rules."""
return self.export_to_csv(request, queryset)
@admin.action(description="Cleanup old history (>1 year)")
def cleanup_old_history(self, request, queryset):
"""Delete slug history older than 1 year."""
from datetime import timedelta
from django.utils import timezone
cutoff = timezone.now() - timedelta(days=365)
old_records = queryset.filter(created_at__lt=cutoff)
count = old_records.count()
old_records.delete()
self.message_user(request, f"Deleted {count} old slug history records.")
def get_actions(self, request):
"""Add custom actions to the admin."""
actions = super().get_actions(request)
actions["export_for_seo"] = (
self.export_for_seo,
"export_for_seo",
"Export for SEO redirects",
)
if request.user.is_superuser:
actions["cleanup_old_history"] = (
self.cleanup_old_history,
"cleanup_old_history",
"Cleanup old history (>1 year)",
)
return actions

View File

@@ -0,0 +1,38 @@
"""
Core admin package providing base classes and mixins for standardized admin behavior.
This package provides reusable admin components that ensure consistency across
all Django admin interfaces in the ThrillWiki application.
Usage:
from apps.core.admin import BaseModelAdmin, QueryOptimizationMixin, ExportActionMixin
Classes:
- BaseModelAdmin: Standard base class with common settings
- QueryOptimizationMixin: Automatic query optimization based on list_display
- ReadOnlyAdminMixin: Disable modifications for auto-generated data
- TimestampFieldsMixin: Standard handling for created_at/updated_at
- SlugFieldMixin: Standard prepopulated_fields for slug
- ExportActionMixin: CSV/JSON export functionality
- BulkStatusChangeMixin: Bulk status change actions
"""
from apps.core.admin.base import BaseModelAdmin
from apps.core.admin.mixins import (
BulkStatusChangeMixin,
ExportActionMixin,
QueryOptimizationMixin,
ReadOnlyAdminMixin,
SlugFieldMixin,
TimestampFieldsMixin,
)
__all__ = [
"BaseModelAdmin",
"QueryOptimizationMixin",
"ReadOnlyAdminMixin",
"TimestampFieldsMixin",
"SlugFieldMixin",
"ExportActionMixin",
"BulkStatusChangeMixin",
]

View File

@@ -0,0 +1,57 @@
"""
Base admin classes providing standardized behavior for all admin interfaces.
This module defines the foundational admin classes that should be used as base
classes for all model admin classes in the ThrillWiki application.
"""
from django.contrib import admin
class BaseModelAdmin(admin.ModelAdmin):
"""
Base admin class with standardized settings for all model admins.
Provides:
- Consistent pagination (50 items per page)
- Optimized result count behavior
- Standard date hierarchy patterns
- Consistent ordering
- Empty value display standardization
Usage:
class MyModelAdmin(BaseModelAdmin):
list_display = ['name', 'status', 'created_at']
# ... additional configuration
Attributes:
list_per_page: Number of items to display per page (default: 50)
show_full_result_count: Whether to show full count (default: False for performance)
empty_value_display: String to display for empty values
save_on_top: Show save buttons at top of change form
preserve_filters: Preserve filters after saving
"""
list_per_page = 50
show_full_result_count = False
empty_value_display = "-"
save_on_top = True
preserve_filters = True
class Meta:
abstract = True
def get_queryset(self, request):
"""
Get the base queryset with any model-specific optimizations.
Override this method in subclasses to add select_related and
prefetch_related calls for query optimization.
Args:
request: The HTTP request object
Returns:
QuerySet: The optimized queryset
"""
return super().get_queryset(request)

View File

@@ -0,0 +1,451 @@
"""
Admin mixins providing reusable functionality for Django admin classes.
These mixins can be combined with BaseModelAdmin to add specific functionality
to admin classes without code duplication.
"""
import csv
import json
from datetime import datetime
from io import StringIO
from django.contrib import admin, messages
from django.core.serializers.json import DjangoJSONEncoder
from django.http import HttpResponse
from django.utils.html import format_html
class QueryOptimizationMixin:
"""
Mixin that provides automatic query optimization based on list_display.
This mixin analyzes the list_display fields and automatically applies
select_related for ForeignKey fields to prevent N+1 queries.
Attributes:
list_select_related: Explicit list of related fields to select
list_prefetch_related: Explicit list of related fields to prefetch
Usage:
class MyModelAdmin(QueryOptimizationMixin, BaseModelAdmin):
list_display = ['name', 'park', 'manufacturer']
list_select_related = ['park', 'manufacturer']
list_prefetch_related = ['reviews', 'photos']
"""
list_select_related = []
list_prefetch_related = []
def get_queryset(self, request):
"""
Optimize queryset with select_related and prefetch_related.
Args:
request: The HTTP request object
Returns:
QuerySet: The optimized queryset
"""
qs = super().get_queryset(request)
if self.list_select_related:
qs = qs.select_related(*self.list_select_related)
if self.list_prefetch_related:
qs = qs.prefetch_related(*self.list_prefetch_related)
return qs
class ReadOnlyAdminMixin:
"""
Mixin that disables add, change, and delete permissions.
Use this mixin for models that contain auto-generated data that should
not be modified through the admin interface (e.g., rankings, logs, history).
The mixin allows viewing but not modifying records. Superusers can still
delete records if needed for maintenance.
Usage:
class RankingAdmin(ReadOnlyAdminMixin, BaseModelAdmin):
list_display = ['ride', 'rank', 'calculated_at']
"""
def has_add_permission(self, request):
"""Disable adding new records."""
return False
def has_change_permission(self, request, obj=None):
"""Disable changing existing records."""
return False
def has_delete_permission(self, request, obj=None):
"""Allow only superusers to delete records."""
return request.user.is_superuser
class TimestampFieldsMixin:
"""
Mixin that provides standard handling for timestamp fields.
Automatically adds created_at and updated_at to readonly_fields and
provides a standard fieldset for metadata display.
Attributes:
timestamp_fields: Tuple of timestamp field names (default: created_at, updated_at)
Usage:
class MyModelAdmin(TimestampFieldsMixin, BaseModelAdmin):
fieldsets = [
('Basic Info', {'fields': ['name', 'description']}),
] + TimestampFieldsMixin.get_timestamp_fieldset()
"""
timestamp_fields = ("created_at", "updated_at")
def get_readonly_fields(self, request, obj=None):
"""Add timestamp fields to readonly_fields."""
readonly = list(super().get_readonly_fields(request, obj))
for field in self.timestamp_fields:
if hasattr(self.model, field) and field not in readonly:
readonly.append(field)
return readonly
@classmethod
def get_timestamp_fieldset(cls):
"""
Get a standard fieldset for timestamp fields.
Returns:
list: A fieldset tuple for use in admin fieldsets configuration
"""
return [
(
"Metadata",
{
"fields": cls.timestamp_fields,
"classes": ("collapse",),
"description": "Record creation and modification timestamps.",
},
)
]
class SlugFieldMixin:
"""
Mixin that provides standard prepopulated_fields configuration for slug.
Automatically configures the slug field to be populated from the name field.
Attributes:
slug_source_field: The field to populate slug from (default: 'name')
Usage:
class MyModelAdmin(SlugFieldMixin, BaseModelAdmin):
# slug will be auto-populated from name
pass
class OtherModelAdmin(SlugFieldMixin, BaseModelAdmin):
slug_source_field = 'title' # Use title instead
"""
slug_source_field = "name"
prepopulated_fields = {}
def get_prepopulated_fields(self, request, obj=None):
"""Get prepopulated fields including slug configuration."""
prepopulated = dict(super().get_prepopulated_fields(request, obj))
if hasattr(self.model, "slug") and hasattr(self.model, self.slug_source_field):
prepopulated["slug"] = (self.slug_source_field,)
return prepopulated
class ExportActionMixin:
"""
Mixin that provides CSV and JSON export functionality.
Adds admin actions to export selected records in CSV or JSON format.
The export includes all fields specified in export_fields or list_display.
Attributes:
export_fields: List of field names to export (defaults to list_display)
export_filename_prefix: Prefix for exported filenames
Usage:
class MyModelAdmin(ExportActionMixin, BaseModelAdmin):
list_display = ['name', 'status', 'created_at']
export_fields = ['id', 'name', 'status', 'created_at', 'updated_at']
export_filename_prefix = 'my_model'
"""
export_fields = None
export_filename_prefix = "export"
def get_export_fields(self):
"""Get the list of fields to export."""
if self.export_fields:
return self.export_fields
return [f for f in self.list_display if not callable(getattr(self, f, None))]
def get_export_filename(self, format_type):
"""Generate export filename with timestamp."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"{self.export_filename_prefix}_{timestamp}.{format_type}"
def get_export_value(self, obj, field_name):
"""Get the value of a field for export, handling related objects."""
try:
value = getattr(obj, field_name, None)
if callable(value):
value = value()
if hasattr(value, "pk"):
return str(value)
return value
except Exception:
return ""
@admin.action(description="Export selected to CSV")
def export_to_csv(self, request, queryset):
"""Export selected records to CSV format."""
fields = self.get_export_fields()
output = StringIO()
writer = csv.writer(output)
# Write header
writer.writerow(fields)
# Write data rows
for obj in queryset:
row = [self.get_export_value(obj, f) for f in fields]
writer.writerow(row)
response = HttpResponse(output.getvalue(), content_type="text/csv")
response["Content-Disposition"] = (
f'attachment; filename="{self.get_export_filename("csv")}"'
)
self.message_user(
request, f"Successfully exported {queryset.count()} records to CSV."
)
return response
@admin.action(description="Export selected to JSON")
def export_to_json(self, request, queryset):
"""Export selected records to JSON format."""
fields = self.get_export_fields()
data = []
for obj in queryset:
record = {}
for field in fields:
value = self.get_export_value(obj, field)
# Handle datetime objects
if isinstance(value, datetime):
value = value.isoformat()
record[field] = value
data.append(record)
response = HttpResponse(
json.dumps(data, indent=2, cls=DjangoJSONEncoder),
content_type="application/json",
)
response["Content-Disposition"] = (
f'attachment; filename="{self.get_export_filename("json")}"'
)
self.message_user(
request, f"Successfully exported {queryset.count()} records to JSON."
)
return response
def get_actions(self, request):
"""Add export actions to the admin."""
actions = super().get_actions(request)
actions["export_to_csv"] = (
self.export_to_csv,
"export_to_csv",
"Export selected to CSV",
)
actions["export_to_json"] = (
self.export_to_json,
"export_to_json",
"Export selected to JSON",
)
return actions
class BulkStatusChangeMixin:
"""
Mixin that provides bulk status change actions.
Adds admin actions to change status of multiple records at once.
Supports FSM-managed status fields with proper transition validation.
Attributes:
status_field: Name of the status field (default: 'status')
status_choices: List of (value, label) tuples for available statuses
Usage:
class MyModelAdmin(BulkStatusChangeMixin, BaseModelAdmin):
status_field = 'status'
status_choices = [
('active', 'Activate'),
('inactive', 'Deactivate'),
]
"""
status_field = "status"
status_choices = []
def get_bulk_status_actions(self):
"""Generate bulk status change actions based on status_choices."""
actions = {}
for status_value, label in self.status_choices:
def make_action(value, action_label):
@admin.action(description=f"Set status to: {action_label}")
def action_func(modeladmin, request, queryset):
return modeladmin._bulk_change_status(request, queryset, value)
return action_func
action_name = f"set_status_{status_value}"
actions[action_name] = make_action(status_value, label)
return actions
def _bulk_change_status(self, request, queryset, new_status):
"""
Change status for all selected records.
Handles both regular status fields and FSM-managed fields.
"""
updated = 0
errors = 0
for obj in queryset:
try:
setattr(obj, self.status_field, new_status)
obj.save(update_fields=[self.status_field])
updated += 1
except Exception as e:
errors += 1
self.message_user(
request,
f"Error updating {obj}: {str(e)}",
level=messages.ERROR,
)
if updated:
self.message_user(
request,
f"Successfully updated status for {updated} records.",
level=messages.SUCCESS,
)
if errors:
self.message_user(
request,
f"Failed to update {errors} records.",
level=messages.WARNING,
)
def get_actions(self, request):
"""Add bulk status change actions to the admin."""
actions = super().get_actions(request)
for name, action in self.get_bulk_status_actions().items():
actions[name] = (action, name, action.short_description)
return actions
class AuditLogMixin:
"""
Mixin that provides audit logging for admin actions.
Logs all changes made through the admin interface including
who made the change, when, and what was changed.
Usage:
class MyModelAdmin(AuditLogMixin, BaseModelAdmin):
pass
"""
def log_addition(self, request, obj, message):
"""Log addition of a new object."""
super().log_addition(request, obj, message)
def log_change(self, request, obj, message):
"""Log change to an existing object."""
super().log_change(request, obj, message)
def log_deletion(self, request, obj, object_repr):
"""Log deletion of an object."""
super().log_deletion(request, obj, object_repr)
class ModerationMixin:
"""
Mixin that provides standard moderation functionality.
Adds moderation actions (approve, reject) and filters for
user-generated content that requires moderation.
Attributes:
moderation_status_field: Name of the moderation status field
moderated_by_field: Name of the field storing the moderator
moderated_at_field: Name of the field storing moderation time
Usage:
class ReviewAdmin(ModerationMixin, BaseModelAdmin):
moderation_status_field = 'moderation_status'
"""
moderation_status_field = "moderation_status"
moderated_by_field = "moderated_by"
moderated_at_field = "moderated_at"
@admin.action(description="Approve selected items")
def bulk_approve(self, request, queryset):
"""Approve all selected items."""
from django.utils import timezone
updated = queryset.update(
**{
self.moderation_status_field: "approved",
self.moderated_by_field: request.user,
self.moderated_at_field: timezone.now(),
}
)
self.message_user(request, f"Successfully approved {updated} items.")
@admin.action(description="Reject selected items")
def bulk_reject(self, request, queryset):
"""Reject all selected items."""
from django.utils import timezone
updated = queryset.update(
**{
self.moderation_status_field: "rejected",
self.moderated_by_field: request.user,
self.moderated_at_field: timezone.now(),
}
)
self.message_user(request, f"Successfully rejected {updated} items.")
def get_actions(self, request):
"""Add moderation actions to the admin."""
actions = super().get_actions(request)
actions["bulk_approve"] = (
self.bulk_approve,
"bulk_approve",
"Approve selected items",
)
actions["bulk_reject"] = (
self.bulk_reject,
"bulk_reject",
"Reject selected items",
)
return actions

View File

@@ -0,0 +1,234 @@
"""
Management command to optimize static files (minification and compression).
This command processes JavaScript and CSS files to create minified versions
for production use, reducing file sizes and improving page load times.
Usage:
python manage.py optimize_static
python manage.py optimize_static --dry-run
python manage.py optimize_static --force
"""
import os
from pathlib import Path
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
class Command(BaseCommand):
help = "Optimize static files by creating minified versions of JS and CSS files"
def add_arguments(self, parser):
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without making changes",
)
parser.add_argument(
"--force",
action="store_true",
help="Overwrite existing minified files",
)
parser.add_argument(
"--js-only",
action="store_true",
help="Only process JavaScript files",
)
parser.add_argument(
"--css-only",
action="store_true",
help="Only process CSS files",
)
def handle(self, *args, **options):
dry_run = options["dry_run"]
force = options["force"]
js_only = options["js_only"]
css_only = options["css_only"]
# Check for required dependencies
try:
import rjsmin
except ImportError:
rjsmin = None
self.stdout.write(
self.style.WARNING(
"rjsmin not installed. Install with: pip install rjsmin"
)
)
try:
import rcssmin
except ImportError:
rcssmin = None
self.stdout.write(
self.style.WARNING(
"rcssmin not installed. Install with: pip install rcssmin"
)
)
if not rjsmin and not rcssmin:
raise CommandError(
"Neither rjsmin nor rcssmin is installed. "
"Install at least one: pip install rjsmin rcssmin"
)
# Get static file directories
static_dirs = list(settings.STATICFILES_DIRS) + [settings.STATIC_ROOT]
static_dirs = [Path(d) for d in static_dirs if d and Path(d).exists()]
if not static_dirs:
raise CommandError("No valid static file directories found")
total_js_saved = 0
total_css_saved = 0
js_files_processed = 0
css_files_processed = 0
for static_dir in static_dirs:
self.stdout.write(f"Processing directory: {static_dir}")
# Process JavaScript files
if not css_only and rjsmin:
js_dir = static_dir / "js"
if js_dir.exists():
saved, count = self._process_js_files(
js_dir, rjsmin, dry_run, force
)
total_js_saved += saved
js_files_processed += count
# Process CSS files
if not js_only and rcssmin:
css_dir = static_dir / "css"
if css_dir.exists():
saved, count = self._process_css_files(
css_dir, rcssmin, dry_run, force
)
total_css_saved += saved
css_files_processed += count
# Summary
self.stdout.write("\n" + "=" * 60)
self.stdout.write(self.style.SUCCESS("Static file optimization complete!"))
self.stdout.write(f"JavaScript files processed: {js_files_processed}")
self.stdout.write(f"CSS files processed: {css_files_processed}")
self.stdout.write(
f"Total JS savings: {self._format_size(total_js_saved)}"
)
self.stdout.write(
f"Total CSS savings: {self._format_size(total_css_saved)}"
)
if dry_run:
self.stdout.write(
self.style.WARNING("\nDry run - no files were modified")
)
def _process_js_files(self, js_dir, rjsmin, dry_run, force):
"""Process JavaScript files for minification."""
total_saved = 0
files_processed = 0
for js_file in js_dir.glob("**/*.js"):
# Skip already minified files
if js_file.name.endswith(".min.js"):
continue
min_file = js_file.with_suffix(".min.js")
# Skip if minified version exists and not forcing
if min_file.exists() and not force:
self.stdout.write(
f" Skipping {js_file.name} (min version exists)"
)
continue
try:
original_content = js_file.read_text(encoding="utf-8")
original_size = len(original_content.encode("utf-8"))
# Minify
minified_content = rjsmin.jsmin(original_content)
minified_size = len(minified_content.encode("utf-8"))
savings = original_size - minified_size
savings_percent = (savings / original_size * 100) if original_size > 0 else 0
if not dry_run:
min_file.write_text(minified_content, encoding="utf-8")
self.stdout.write(
f" {js_file.name}: {self._format_size(original_size)} -> "
f"{self._format_size(minified_size)} "
f"(-{savings_percent:.1f}%)"
)
total_saved += savings
files_processed += 1
except Exception as e:
self.stdout.write(
self.style.ERROR(f" Error processing {js_file.name}: {e}")
)
return total_saved, files_processed
def _process_css_files(self, css_dir, rcssmin, dry_run, force):
"""Process CSS files for minification."""
total_saved = 0
files_processed = 0
for css_file in css_dir.glob("**/*.css"):
# Skip already minified files
if css_file.name.endswith(".min.css"):
continue
min_file = css_file.with_suffix(".min.css")
# Skip if minified version exists and not forcing
if min_file.exists() and not force:
self.stdout.write(
f" Skipping {css_file.name} (min version exists)"
)
continue
try:
original_content = css_file.read_text(encoding="utf-8")
original_size = len(original_content.encode("utf-8"))
# Minify
minified_content = rcssmin.cssmin(original_content)
minified_size = len(minified_content.encode("utf-8"))
savings = original_size - minified_size
savings_percent = (savings / original_size * 100) if original_size > 0 else 0
if not dry_run:
min_file.write_text(minified_content, encoding="utf-8")
self.stdout.write(
f" {css_file.name}: {self._format_size(original_size)} -> "
f"{self._format_size(minified_size)} "
f"(-{savings_percent:.1f}%)"
)
total_saved += savings
files_processed += 1
except Exception as e:
self.stdout.write(
self.style.ERROR(f" Error processing {css_file.name}: {e}")
)
return total_saved, files_processed
def _format_size(self, size_bytes):
"""Format byte size to human-readable format."""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
else:
return f"{size_bytes / (1024 * 1024):.2f} MB"

View File

@@ -0,0 +1,153 @@
"""
Django management command to validate configuration settings.
This command validates all environment variables and configuration
settings, providing a detailed report of any issues found.
Usage:
python manage.py validate_settings
python manage.py validate_settings --strict
python manage.py validate_settings --json
"""
import json
import sys
from django.core.management.base import BaseCommand, CommandError
from config.settings.validation import (
validate_all_settings,
get_validation_report,
)
from config.settings.secrets import (
validate_required_secrets,
check_secret_expiry,
)
class Command(BaseCommand):
help = "Validate environment variables and configuration settings"
def add_arguments(self, parser):
parser.add_argument(
"--strict",
action="store_true",
help="Treat warnings as errors",
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON",
)
parser.add_argument(
"--secrets-only",
action="store_true",
help="Only validate secrets",
)
def handle(self, *args, **options):
strict = options["strict"]
json_output = options["json"]
secrets_only = options["secrets_only"]
results = {
"settings": None,
"secrets": None,
"expiry": None,
"overall_valid": True,
}
# Validate secrets
secret_errors = validate_required_secrets()
expiry_warnings = check_secret_expiry()
results["secrets"] = {
"errors": secret_errors,
"valid": len(secret_errors) == 0,
}
results["expiry"] = {
"warnings": expiry_warnings,
}
if secret_errors:
results["overall_valid"] = False
# Validate general settings (unless secrets-only)
if not secrets_only:
settings_result = validate_all_settings(raise_on_error=False)
results["settings"] = settings_result
if not settings_result["valid"]:
results["overall_valid"] = False
if strict and settings_result["warnings"]:
results["overall_valid"] = False
# Output results
if json_output:
self.stdout.write(json.dumps(results, indent=2))
else:
self._print_human_readable(results, strict, secrets_only)
# Exit with appropriate code
if not results["overall_valid"]:
sys.exit(1)
def _print_human_readable(self, results, strict, secrets_only):
"""Print human-readable validation report."""
self.stdout.write("")
self.stdout.write("=" * 60)
self.stdout.write(self.style.HTTP_INFO("ThrillWiki Configuration Validation"))
self.stdout.write("=" * 60)
self.stdout.write("")
# Secret validation results
self.stdout.write(self.style.HTTP_INFO("Secret Validation:"))
self.stdout.write("-" * 40)
if results["secrets"]["valid"]:
self.stdout.write(self.style.SUCCESS(" ✓ All required secrets are valid"))
else:
self.stdout.write(self.style.ERROR(" ✗ Secret validation failed:"))
for error in results["secrets"]["errors"]:
self.stdout.write(self.style.ERROR(f" - {error}"))
# Secret expiry warnings
if results["expiry"]["warnings"]:
self.stdout.write("")
self.stdout.write(self.style.WARNING(" Secret Expiry Warnings:"))
for warning in results["expiry"]["warnings"]:
self.stdout.write(self.style.WARNING(f" - {warning}"))
self.stdout.write("")
# Settings validation results (if not secrets-only)
if not secrets_only and results["settings"]:
self.stdout.write(self.style.HTTP_INFO("Settings Validation:"))
self.stdout.write("-" * 40)
if results["settings"]["valid"]:
self.stdout.write(self.style.SUCCESS(" ✓ All settings are valid"))
else:
self.stdout.write(self.style.ERROR(" ✗ Settings validation failed:"))
for error in results["settings"]["errors"]:
self.stdout.write(self.style.ERROR(f" - {error}"))
# Warnings
if results["settings"]["warnings"]:
self.stdout.write("")
self.stdout.write(self.style.WARNING(" Warnings:"))
for warning in results["settings"]["warnings"]:
prefix = "" if strict else "!"
style = self.style.ERROR if strict else self.style.WARNING
self.stdout.write(style(f" {prefix} {warning}"))
self.stdout.write("")
self.stdout.write("=" * 60)
# Overall status
if results["overall_valid"]:
self.stdout.write(self.style.SUCCESS("Overall Status: PASSED"))
else:
self.stdout.write(self.style.ERROR("Overall Status: FAILED"))
self.stdout.write("=" * 60)
self.stdout.write("")

View File

@@ -0,0 +1,279 @@
"""
Management command to warm cache with frequently accessed data.
This command pre-populates the cache with commonly requested data to improve
initial response times after deployment or cache flush.
Usage:
python manage.py warm_cache
python manage.py warm_cache --parks-only
python manage.py warm_cache --rides-only
python manage.py warm_cache --metadata-only
python manage.py warm_cache --dry-run
"""
import time
import logging
from django.core.management.base import BaseCommand
from django.db.models import Count, Avg
from apps.core.services.enhanced_cache_service import EnhancedCacheService, CacheWarmer
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Warm cache with frequently accessed data for improved performance"
def add_arguments(self, parser):
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be cached without actually caching",
)
parser.add_argument(
"--parks-only",
action="store_true",
help="Only warm park-related caches",
)
parser.add_argument(
"--rides-only",
action="store_true",
help="Only warm ride-related caches",
)
parser.add_argument(
"--metadata-only",
action="store_true",
help="Only warm filter metadata caches",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Show detailed output",
)
def handle(self, *args, **options):
dry_run = options["dry_run"]
parks_only = options["parks_only"]
rides_only = options["rides_only"]
metadata_only = options["metadata_only"]
verbose = options["verbose"]
# Default to warming all if no specific option is selected
warm_all = not (parks_only or rides_only or metadata_only)
start_time = time.time()
cache_service = EnhancedCacheService()
warmed_count = 0
failed_count = 0
self.stdout.write("Starting cache warming...")
if dry_run:
self.stdout.write(self.style.WARNING("DRY RUN - No caches will be modified"))
# Import models (do this lazily to avoid circular imports)
try:
from apps.parks.models import Park
from apps.rides.models import Ride
parks_available = True
rides_available = True
except ImportError as e:
self.stdout.write(self.style.WARNING(f"Some models not available: {e}"))
parks_available = False
rides_available = False
# Warm park caches
if (warm_all or parks_only) and parks_available:
self.stdout.write("\nWarming park caches...")
# Park list
if not dry_run:
try:
parks_list = list(
Park.objects.select_related("location", "operator")
.only("id", "name", "slug", "status", "location__city", "location__state_province", "location__country")
.order_by("name")[:500]
)
cache_service.default_cache.set(
"warm:park_list",
[{"id": p.id, "name": p.name, "slug": p.slug} for p in parks_list],
timeout=3600
)
warmed_count += 1
if verbose:
self.stdout.write(f" Cached {len(parks_list)} parks")
except Exception as e:
failed_count += 1
self.stdout.write(self.style.ERROR(f" Failed to cache park list: {e}"))
else:
self.stdout.write(" Would cache: park_list")
warmed_count += 1
# Park counts by status
if not dry_run:
try:
status_counts = Park.objects.values("status").annotate(count=Count("id"))
cache_service.default_cache.set(
"warm:park_status_counts",
list(status_counts),
timeout=3600
)
warmed_count += 1
if verbose:
self.stdout.write(f" Cached park status counts")
except Exception as e:
failed_count += 1
self.stdout.write(self.style.ERROR(f" Failed to cache park status counts: {e}"))
else:
self.stdout.write(" Would cache: park_status_counts")
warmed_count += 1
# Popular parks (top 20 by ride count)
if not dry_run:
try:
popular_parks = list(
Park.objects.annotate(ride_count=Count("rides"))
.select_related("location")
.order_by("-ride_count")[:20]
)
cache_service.default_cache.set(
"warm:popular_parks",
[{"id": p.id, "name": p.name, "slug": p.slug, "ride_count": p.ride_count} for p in popular_parks],
timeout=3600
)
warmed_count += 1
if verbose:
self.stdout.write(f" Cached {len(popular_parks)} popular parks")
except Exception as e:
failed_count += 1
self.stdout.write(self.style.ERROR(f" Failed to cache popular parks: {e}"))
else:
self.stdout.write(" Would cache: popular_parks")
warmed_count += 1
# Warm ride caches
if (warm_all or rides_only) and rides_available:
self.stdout.write("\nWarming ride caches...")
# Ride list
if not dry_run:
try:
rides_list = list(
Ride.objects.select_related("park")
.only("id", "name", "slug", "status", "category", "park__name", "park__slug")
.order_by("name")[:1000]
)
cache_service.default_cache.set(
"warm:ride_list",
[{"id": r.id, "name": r.name, "slug": r.slug, "park": r.park.name if r.park else None} for r in rides_list],
timeout=3600
)
warmed_count += 1
if verbose:
self.stdout.write(f" Cached {len(rides_list)} rides")
except Exception as e:
failed_count += 1
self.stdout.write(self.style.ERROR(f" Failed to cache ride list: {e}"))
else:
self.stdout.write(" Would cache: ride_list")
warmed_count += 1
# Ride counts by category
if not dry_run:
try:
category_counts = Ride.objects.values("category").annotate(count=Count("id"))
cache_service.default_cache.set(
"warm:ride_category_counts",
list(category_counts),
timeout=3600
)
warmed_count += 1
if verbose:
self.stdout.write(f" Cached ride category counts")
except Exception as e:
failed_count += 1
self.stdout.write(self.style.ERROR(f" Failed to cache ride category counts: {e}"))
else:
self.stdout.write(" Would cache: ride_category_counts")
warmed_count += 1
# Top-rated rides
if not dry_run:
try:
top_rides = list(
Ride.objects.filter(average_rating__isnull=False)
.select_related("park")
.order_by("-average_rating")[:20]
)
cache_service.default_cache.set(
"warm:top_rated_rides",
[{"id": r.id, "name": r.name, "slug": r.slug, "rating": float(r.average_rating) if r.average_rating else None} for r in top_rides],
timeout=3600
)
warmed_count += 1
if verbose:
self.stdout.write(f" Cached {len(top_rides)} top-rated rides")
except Exception as e:
failed_count += 1
self.stdout.write(self.style.ERROR(f" Failed to cache top-rated rides: {e}"))
else:
self.stdout.write(" Would cache: top_rated_rides")
warmed_count += 1
# Warm filter metadata caches
if warm_all or metadata_only:
self.stdout.write("\nWarming filter metadata caches...")
if parks_available and not dry_run:
try:
# Park filter metadata
from apps.parks.services.hybrid_loader import smart_park_loader
metadata = smart_park_loader.get_filter_metadata()
cache_service.default_cache.set(
"warm:park_filter_metadata",
metadata,
timeout=1800
)
warmed_count += 1
if verbose:
self.stdout.write(" Cached park filter metadata")
except Exception as e:
failed_count += 1
self.stdout.write(self.style.ERROR(f" Failed to cache park filter metadata: {e}"))
elif parks_available:
self.stdout.write(" Would cache: park_filter_metadata")
warmed_count += 1
if rides_available and not dry_run:
try:
# Ride filter metadata
from apps.rides.services.hybrid_loader import SmartRideLoader
ride_loader = SmartRideLoader()
metadata = ride_loader.get_filter_metadata()
cache_service.default_cache.set(
"warm:ride_filter_metadata",
metadata,
timeout=1800
)
warmed_count += 1
if verbose:
self.stdout.write(" Cached ride filter metadata")
except Exception as e:
failed_count += 1
self.stdout.write(self.style.ERROR(f" Failed to cache ride filter metadata: {e}"))
elif rides_available:
self.stdout.write(" Would cache: ride_filter_metadata")
warmed_count += 1
# Summary
elapsed_time = time.time() - start_time
self.stdout.write("\n" + "=" * 60)
self.stdout.write(self.style.SUCCESS(f"Cache warming completed in {elapsed_time:.2f} seconds"))
self.stdout.write(f"Successfully warmed: {warmed_count} cache entries")
if failed_count > 0:
self.stdout.write(self.style.ERROR(f"Failed: {failed_count} cache entries"))
if dry_run:
self.stdout.write(self.style.WARNING("\nDry run - no caches were actually modified"))

View File

@@ -2,10 +2,14 @@
Analytics and tracking middleware for Django application.
"""
import logging
import pghistory
from django.contrib.auth.models import AnonymousUser
from django.core.handlers.wsgi import WSGIRequest
logger = logging.getLogger(__name__)
class RequestContextProvider(pghistory.context):
"""Custom context provider for pghistory that extracts information from the request."""

View File

@@ -1,7 +1,11 @@
# backend/apps/core/middleware.py
import logging
from django.utils.deprecation import MiddlewareMixin
logger = logging.getLogger(__name__)
class APIResponseMiddleware(MiddlewareMixin):
"""
@@ -42,7 +46,9 @@ class APIResponseMiddleware(MiddlewareMixin):
)
# Uncomment if your dev frontend needs to send cookies/auth credentials
# response['Access-Control-Allow-Credentials'] = 'true'
logger.debug(f"Added CORS headers for origin: {origin}")
else:
logger.warning(f"Rejected CORS request from origin: {origin}")
response["Access-Control-Allow-Origin"] = "null"
return response

View File

@@ -232,33 +232,28 @@ class DatabaseConnectionMiddleware(MiddlewareMixin):
"""Middleware to monitor database connection health"""
def process_request(self, request):
"""Check database connection at start of request"""
try:
# Simple connection test
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
except Exception as e:
logger.error(
f"Database connection failed at request start: {e}",
extra={
"path": request.path,
"method": request.method,
"database_error": str(e),
},
)
# Don't block the request, let Django handle the database error
"""Check database connection at start of request (only for health checks)"""
# Skip per-request connection checks to avoid extra round trips
# The database connection will be validated lazily by Django when needed
pass
def process_response(self, request, response):
"""Close database connections properly"""
try:
from django.db import connection
"""Close database connections only when pooling is disabled"""
# Only close connections when CONN_MAX_AGE is 0 (no pooling)
# When pooling is enabled (CONN_MAX_AGE > 0), let Django manage connections
conn_max_age = getattr(settings, "CONN_MAX_AGE", None)
if conn_max_age is None:
# Check database settings for CONN_MAX_AGE
db_settings = getattr(settings, "DATABASES", {}).get("default", {})
conn_max_age = db_settings.get("CONN_MAX_AGE", 0)
connection.close()
except Exception as e:
logger.warning(f"Error closing database connection: {e}")
if conn_max_age == 0:
try:
from django.db import connection
connection.close()
except Exception as e:
logger.warning(f"Error closing database connection: {e}")
return response

View File

@@ -15,8 +15,12 @@ Usage:
to MIDDLEWARE in settings.py (after SecurityMiddleware).
"""
import logging
from django.conf import settings
logger = logging.getLogger(__name__)
class SecurityHeadersMiddleware:
"""
@@ -44,6 +48,10 @@ class SecurityHeadersMiddleware:
if "text/html" in content_type:
if not response.get("Content-Security-Policy"):
response["Content-Security-Policy"] = self._csp_header
else:
logger.warning(
f"CSP header already present for {request.path}, skipping"
)
# Permissions-Policy (successor to Feature-Policy)
if not response.get("Permissions-Policy"):
@@ -60,6 +68,8 @@ class SecurityHeadersMiddleware:
if not response.get("Cross-Origin-Resource-Policy"):
response["Cross-Origin-Resource-Policy"] = "same-origin"
logger.debug(f"Added security headers to response for {request.path}")
return response
def _build_csp_header(self):

View File

@@ -13,21 +13,27 @@ class SlugHistory(models.Model):
Uses generic relations to work with any model.
"""
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
content_type = models.ForeignKey(
ContentType,
on_delete=models.CASCADE,
help_text="Type of model this slug belongs to",
)
object_id = models.CharField(
max_length=50
max_length=50,
help_text="ID of the object this slug belongs to",
) # Using CharField to work with our custom IDs
content_object = GenericForeignKey("content_type", "object_id")
old_slug = models.SlugField(max_length=200)
old_slug = models.SlugField(max_length=200, help_text="Previous slug value")
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
verbose_name = "Slug History"
verbose_name_plural = "Slug Histories"
indexes = [
models.Index(fields=["content_type", "object_id"]),
models.Index(fields=["old_slug"]),
]
verbose_name_plural = "Slug histories"
ordering = ["-created_at"]
def __str__(self):
@@ -39,8 +45,8 @@ class SluggedModel(TrackedModel):
Abstract base model that provides slug functionality with history tracking.
"""
name = models.CharField(max_length=200)
slug = models.SlugField(max_length=200, unique=True)
name = models.CharField(max_length=200, help_text="Name of the object")
slug = models.SlugField(max_length=200, unique=True, help_text="URL-friendly identifier")
class Meta(TrackedModel.Meta):
abstract = True

View File

@@ -193,7 +193,7 @@ def cache_api_response(timeout=1800, vary_on=None, key_prefix=""):
# Try to get from cache
cache_service = EnhancedCacheService()
cached_response = cache_service.api_cache.get(cache_key)
if cached_response:
if cached_response is not None:
logger.debug(f"Cache hit for API view {view_func.__name__}")
return cached_response
@@ -318,3 +318,54 @@ class CacheMonitor:
stats = self.get_cache_stats()
if stats:
logger.info("Cache performance statistics", extra=stats)
def get_cache_statistics(self, key_prefix: str = "") -> Dict[str, Any]:
"""
Get cache statistics for a given key prefix.
Returns hits, misses, hit_rate, and size if available.
Falls back to global cache statistics for Redis backends.
"""
stats = {
"hits": 0,
"misses": 0,
"hit_rate": 0.0,
"size": 0,
"backend": "unknown",
}
try:
cache_backend = self.cache_service.default_cache.__class__.__name__
stats["backend"] = cache_backend
if "Redis" in cache_backend:
# Get Redis client and stats
redis_client = self.cache_service.default_cache._cache.get_client()
info = redis_client.info()
hits = info.get("keyspace_hits", 0)
misses = info.get("keyspace_misses", 0)
stats["hits"] = hits
stats["misses"] = misses
stats["hit_rate"] = (hits / (hits + misses) * 100) if (hits + misses) > 0 else 0.0
# Get key count for prefix if pattern matching is supported
if key_prefix:
try:
keys = redis_client.keys(f"*{key_prefix}*")
stats["size"] = len(keys) if keys else 0
except Exception:
stats["size"] = info.get("db0", {}).get("keys", 0) if isinstance(info.get("db0"), dict) else 0
else:
stats["size"] = info.get("db0", {}).get("keys", 0) if isinstance(info.get("db0"), dict) else 0
else:
# For local memory cache - limited statistics available
stats["message"] = f"Detailed statistics not available for {cache_backend}"
except Exception as e:
logger.debug(f"Could not retrieve cache statistics: {e}")
stats["message"] = "Cache statistics unavailable"
return stats

View File

@@ -297,7 +297,7 @@ class CompanyLocationAdapter(BaseLocationAdapter):
"""Convert CompanyHeadquarters to UnifiedLocation."""
# Note: CompanyHeadquarters doesn't have coordinates, so we need to geocode
# For now, we'll skip companies without coordinates
# TODO(THRILLWIKI-101): Implement geocoding service integration for company HQs
# See FUTURE_WORK.md - THRILLWIKI-101 for geocoding implementation plan
return None
def get_queryset(

View File

View File

@@ -0,0 +1,194 @@
"""
Tests for core admin base classes and mixins.
These tests verify the functionality of the base admin classes and mixins
that provide standardized behavior across all admin interfaces.
"""
import pytest
from django.contrib.admin.sites import AdminSite
from django.contrib.auth import get_user_model
from django.test import RequestFactory, TestCase
from apps.core.admin.base import BaseModelAdmin
from apps.core.admin.mixins import (
BulkStatusChangeMixin,
ExportActionMixin,
QueryOptimizationMixin,
ReadOnlyAdminMixin,
SlugFieldMixin,
TimestampFieldsMixin,
)
User = get_user_model()
class TestBaseModelAdmin(TestCase):
"""Tests for BaseModelAdmin class."""
def test_default_settings(self):
"""Verify default settings are correctly set."""
admin = BaseModelAdmin(model=User, admin_site=AdminSite())
assert admin.list_per_page == 50
assert admin.show_full_result_count is False
assert admin.empty_value_display == "-"
assert admin.save_on_top is True
assert admin.preserve_filters is True
class TestQueryOptimizationMixin(TestCase):
"""Tests for QueryOptimizationMixin."""
def test_queryset_optimization(self):
"""Verify select_related and prefetch_related are applied."""
class TestAdmin(QueryOptimizationMixin, BaseModelAdmin):
list_select_related = ["profile"]
list_prefetch_related = ["groups"]
admin = TestAdmin(model=User, admin_site=AdminSite())
factory = RequestFactory()
request = factory.get("/admin/")
request.user = User(is_superuser=True)
qs = admin.get_queryset(request)
# The queryset should have the select_related/prefetch_related applied
assert qs is not None
class TestReadOnlyAdminMixin(TestCase):
"""Tests for ReadOnlyAdminMixin."""
def setUp(self):
self.factory = RequestFactory()
self.site = AdminSite()
def test_has_add_permission_returns_false(self):
"""Verify add permission is disabled."""
class TestAdmin(ReadOnlyAdminMixin, BaseModelAdmin):
pass
admin = TestAdmin(model=User, admin_site=self.site)
request = self.factory.get("/admin/")
request.user = User(is_superuser=True)
assert admin.has_add_permission(request) is False
def test_has_change_permission_returns_false(self):
"""Verify change permission is disabled."""
class TestAdmin(ReadOnlyAdminMixin, BaseModelAdmin):
pass
admin = TestAdmin(model=User, admin_site=self.site)
request = self.factory.get("/admin/")
request.user = User(is_superuser=False)
assert admin.has_change_permission(request) is False
def test_has_delete_permission_superuser_only(self):
"""Verify delete permission is superuser only."""
class TestAdmin(ReadOnlyAdminMixin, BaseModelAdmin):
pass
admin = TestAdmin(model=User, admin_site=self.site)
request = self.factory.get("/admin/")
# Non-superuser
request.user = User(is_superuser=False)
assert admin.has_delete_permission(request) is False
# Superuser
request.user = User(is_superuser=True)
assert admin.has_delete_permission(request) is True
class TestTimestampFieldsMixin(TestCase):
"""Tests for TimestampFieldsMixin."""
def test_timestamp_fieldset(self):
"""Verify timestamp fieldset is correctly generated."""
fieldset = TimestampFieldsMixin.get_timestamp_fieldset()
assert len(fieldset) == 1
assert fieldset[0][0] == "Metadata"
assert "collapse" in fieldset[0][1]["classes"]
assert fieldset[0][1]["fields"] == ("created_at", "updated_at")
class TestSlugFieldMixin(TestCase):
"""Tests for SlugFieldMixin."""
def test_default_slug_source_field(self):
"""Verify default slug source field is 'name'."""
class TestAdmin(SlugFieldMixin, BaseModelAdmin):
pass
admin = TestAdmin(model=User, admin_site=AdminSite())
assert admin.slug_source_field == "name"
class TestExportActionMixin(TestCase):
"""Tests for ExportActionMixin."""
def setUp(self):
self.factory = RequestFactory()
self.site = AdminSite()
def test_get_export_filename(self):
"""Verify export filename generation."""
class TestAdmin(ExportActionMixin, BaseModelAdmin):
export_filename_prefix = "test_export"
admin = TestAdmin(model=User, admin_site=self.site)
csv_filename = admin.get_export_filename("csv")
assert csv_filename.startswith("test_export_")
assert csv_filename.endswith(".csv")
json_filename = admin.get_export_filename("json")
assert json_filename.startswith("test_export_")
assert json_filename.endswith(".json")
def test_export_actions_registered(self):
"""Verify export actions are registered."""
class TestAdmin(ExportActionMixin, BaseModelAdmin):
pass
admin = TestAdmin(model=User, admin_site=self.site)
request = self.factory.get("/admin/")
request.user = User(is_superuser=True)
actions = admin.get_actions(request)
assert "export_to_csv" in actions
assert "export_to_json" in actions
class TestBulkStatusChangeMixin(TestCase):
"""Tests for BulkStatusChangeMixin."""
def setUp(self):
self.factory = RequestFactory()
self.site = AdminSite()
def test_bulk_status_actions_generated(self):
"""Verify bulk status actions are generated from status_choices."""
class TestAdmin(BulkStatusChangeMixin, BaseModelAdmin):
status_field = "status"
status_choices = [
("active", "Activate"),
("inactive", "Deactivate"),
]
admin = TestAdmin(model=User, admin_site=self.site)
actions = admin.get_bulk_status_actions()
assert "set_status_active" in actions
assert "set_status_inactive" in actions

View File

@@ -421,12 +421,14 @@ def scan_file_for_malware(file: UploadedFile) -> Tuple[bool, str]:
This function should be implemented to integrate with a virus scanner
like ClamAV. Currently it returns True (safe) for all files.
See FUTURE_WORK.md - THRILLWIKI-110 for ClamAV integration plan.
Args:
file: The uploaded file object
Returns:
Tuple of (is_safe, reason_if_unsafe)
"""
# TODO(THRILLWIKI-110): Implement ClamAV integration for malware scanning
# This requires ClamAV daemon to be running and python-clamav to be installed
# ClamAV integration not yet implemented - see FUTURE_WORK.md
# Currently returns True (safe) for all files
return True, ""

View File

@@ -636,7 +636,6 @@ class MapCacheView(MapAPIView):
def delete(self, request: HttpRequest) -> JsonResponse:
"""Clear all map cache (admin only)."""
# TODO(THRILLWIKI-103): Add admin permission check for cache clear
if not (request.user.is_authenticated and request.user.is_staff):
return self._error_response("Admin access required", 403)
try:
@@ -657,7 +656,6 @@ class MapCacheView(MapAPIView):
def post(self, request: HttpRequest) -> JsonResponse:
"""Invalidate specific cache entries."""
# TODO(THRILLWIKI-103): Add admin permission check for cache invalidation
if not (request.user.is_authenticated and request.user.is_staff):
return self._error_response("Admin access required", 403)
try:

View File

@@ -0,0 +1,271 @@
"""
Performance Dashboard View for monitoring application performance.
This view provides a dashboard for administrators to monitor:
- Cache statistics (hit rate, memory usage)
- Database query performance
- Response times
- Error rates
- Connection pool status
Access: Staff/Admin only
URL: /admin/performance/ (configured in urls.py)
"""
import time
import logging
from typing import Any, Dict
from django.views import View
from django.views.generic import TemplateView
from django.http import JsonResponse
from django.contrib.admin.views.decorators import staff_member_required
from django.utils.decorators import method_decorator
from django.db import connection
from django.core.cache import caches
from django.conf import settings
from apps.core.services.enhanced_cache_service import CacheMonitor
logger = logging.getLogger(__name__)
@method_decorator(staff_member_required, name="dispatch")
class PerformanceDashboardView(TemplateView):
"""
Performance dashboard for monitoring application metrics.
Accessible only to staff members.
"""
template_name = "core/performance_dashboard.html"
def get_context_data(self, **kwargs) -> Dict[str, Any]:
context = super().get_context_data(**kwargs)
# Get cache statistics
context["cache_stats"] = self._get_cache_stats()
# Get database stats
context["database_stats"] = self._get_database_stats()
# Get middleware settings
context["middleware_config"] = self._get_middleware_config()
# Get cache configuration
context["cache_config"] = self._get_cache_config()
return context
def _get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics from all configured caches."""
stats = {}
try:
cache_monitor = CacheMonitor()
stats["default"] = cache_monitor.get_cache_stats()
except Exception as e:
stats["default"] = {"error": str(e)}
# Try to get stats for each configured cache
for cache_name in settings.CACHES.keys():
try:
cache = caches[cache_name]
cache_backend = cache.__class__.__name__
cache_stats = {
"backend": cache_backend,
"key_prefix": getattr(cache, "key_prefix", "N/A"),
}
# Try to get Redis-specific stats
if "Redis" in cache_backend:
try:
client = cache._cache.get_client()
info = client.info()
cache_stats.update({
"connected_clients": info.get("connected_clients"),
"used_memory_human": info.get("used_memory_human"),
"keyspace_hits": info.get("keyspace_hits", 0),
"keyspace_misses": info.get("keyspace_misses", 0),
"total_commands": info.get("total_commands_processed"),
})
# Calculate hit rate
hits = info.get("keyspace_hits", 0)
misses = info.get("keyspace_misses", 0)
if hits + misses > 0:
cache_stats["hit_rate"] = f"{(hits / (hits + misses) * 100):.1f}%"
else:
cache_stats["hit_rate"] = "N/A"
except Exception:
pass
stats[cache_name] = cache_stats
except Exception as e:
stats[cache_name] = {"error": str(e)}
return stats
def _get_database_stats(self) -> Dict[str, Any]:
"""Get database connection and query statistics."""
stats = {}
try:
# Get database connection info
db_settings = settings.DATABASES.get("default", {})
stats["engine"] = db_settings.get("ENGINE", "Unknown").split(".")[-1]
stats["name"] = db_settings.get("NAME", "Unknown")
stats["conn_max_age"] = getattr(settings, "CONN_MAX_AGE", 0)
# Test connection and get server version
with connection.cursor() as cursor:
cursor.execute("SELECT version();")
stats["server_version"] = cursor.fetchone()[0]
# Get connection count (PostgreSQL specific)
try:
cursor.execute(
"SELECT count(*) FROM pg_stat_activity WHERE datname = %s;",
[db_settings.get("NAME")]
)
stats["active_connections"] = cursor.fetchone()[0]
except Exception:
stats["active_connections"] = "N/A"
except Exception as e:
stats["error"] = str(e)
return stats
def _get_middleware_config(self) -> Dict[str, Any]:
"""Get middleware configuration summary."""
middleware = settings.MIDDLEWARE
return {
"count": len(middleware),
"has_gzip": "django.middleware.gzip.GZipMiddleware" in middleware,
"has_cache_update": "django.middleware.cache.UpdateCacheMiddleware" in middleware,
"has_cache_fetch": "django.middleware.cache.FetchFromCacheMiddleware" in middleware,
"has_performance": any("performance" in m.lower() for m in middleware),
"middleware_list": middleware,
}
def _get_cache_config(self) -> Dict[str, Any]:
"""Get cache configuration summary."""
cache_config = {}
for cache_name, config in settings.CACHES.items():
cache_config[cache_name] = {
"backend": config.get("BACKEND", "Unknown").split(".")[-1],
"location": config.get("LOCATION", "Unknown"),
"key_prefix": config.get("KEY_PREFIX", "None"),
"version": config.get("VERSION", 1),
}
# Get connection pool settings if available
options = config.get("OPTIONS", {})
pool_kwargs = options.get("CONNECTION_POOL_CLASS_KWARGS", {})
if pool_kwargs:
cache_config[cache_name]["max_connections"] = pool_kwargs.get("max_connections", "N/A")
cache_config[cache_name]["timeout"] = pool_kwargs.get("timeout", "N/A")
return cache_config
@method_decorator(staff_member_required, name="dispatch")
class PerformanceMetricsAPIView(View):
"""
JSON API endpoint for real-time performance metrics.
Used by the dashboard for AJAX updates.
"""
def get(self, request) -> JsonResponse:
metrics = {}
# Cache stats
try:
cache_monitor = CacheMonitor()
metrics["cache"] = cache_monitor.get_cache_stats()
except Exception as e:
metrics["cache"] = {"error": str(e)}
# Quick database check
try:
start_time = time.time()
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
metrics["database"] = {
"status": "healthy",
"response_time_ms": round((time.time() - start_time) * 1000, 2),
}
except Exception as e:
metrics["database"] = {
"status": "error",
"error": str(e),
}
# Quick cache check
try:
cache = caches["default"]
test_key = "_performance_check"
cache.set(test_key, 1, 10)
if cache.get(test_key) == 1:
metrics["cache_health"] = "healthy"
else:
metrics["cache_health"] = "degraded"
cache.delete(test_key)
except Exception as e:
metrics["cache_health"] = f"error: {str(e)}"
return JsonResponse(metrics)
@method_decorator(staff_member_required, name="dispatch")
class CacheStatsAPIView(View):
"""
Detailed cache statistics endpoint.
"""
def get(self, request) -> JsonResponse:
stats = {}
for cache_name in settings.CACHES.keys():
try:
cache = caches[cache_name]
cache_backend = cache.__class__.__name__
cache_info = {"backend": cache_backend}
if "Redis" in cache_backend:
try:
client = cache._cache.get_client()
info = client.info()
cache_info.update({
"used_memory": info.get("used_memory_human"),
"connected_clients": info.get("connected_clients"),
"keyspace_hits": info.get("keyspace_hits", 0),
"keyspace_misses": info.get("keyspace_misses", 0),
"expired_keys": info.get("expired_keys", 0),
"evicted_keys": info.get("evicted_keys", 0),
"total_connections_received": info.get("total_connections_received"),
"total_commands_processed": info.get("total_commands_processed"),
})
# Calculate metrics
hits = info.get("keyspace_hits", 0)
misses = info.get("keyspace_misses", 0)
if hits + misses > 0:
cache_info["hit_rate"] = round(hits / (hits + misses) * 100, 2)
except Exception as e:
cache_info["redis_error"] = str(e)
stats[cache_name] = cache_info
except Exception as e:
stats[cache_name] = {"error": str(e)}
return JsonResponse(stats)