Refactor notification service and map API views for improved readability and maintainability; add code complexity management guidelines

This commit is contained in:
pacnpal
2025-08-30 09:03:11 -04:00
parent 04394b9976
commit fb6726f89a
4 changed files with 280 additions and 247 deletions

View File

@@ -0,0 +1,37 @@
## Brief overview
Guidelines for managing code complexity and maintaining clean, maintainable code in Django projects. These rules focus on reducing cognitive complexity, improving code organization, and following best practices for refactoring complex methods.
## Cognitive complexity management
- Always break down methods with high cognitive complexity (>15) into smaller, focused helper methods
- Extract logical operations into separate methods with descriptive names
- Use single responsibility principle - each method should have one clear purpose
- Prefer composition over deeply nested conditional logic
- When refactoring complex methods, maintain original functionality while improving structure
## Method extraction patterns
- Create helper methods for parameter parsing and validation
- Separate data retrieval logic from serialization logic
- Extract cache operations into dedicated methods
- Use descriptive method names that clearly indicate their purpose (e.g., `_serialize_park_data`, `_get_parks_data`)
- Keep helper methods focused and avoid creating new complexity within them
## Error handling and type safety
- Always handle None values explicitly to avoid type errors
- Use proper type annotations, including union types (e.g., `Polygon | None`)
- Implement fallback mechanisms for operations that may not be available in all environments
- Validate input parameters before processing to prevent runtime errors
- Use `getattr()` with defaults when accessing potentially missing attributes
## Django API view organization
- Structure API views with clear separation between parameter handling, business logic, and response building
- Use helper methods to reduce the main method complexity while preserving readability
- Maintain consistent error response formats across all endpoints
- Implement proper caching strategies with appropriate fallbacks
- Follow DRF patterns for serialization and response construction
## Refactoring approach
- When addressing SonarQube or linting warnings, focus on structural improvements rather than quick fixes
- Preserve all original functionality during refactoring
- Test edge cases and error conditions after complexity reduction
- Maintain API contracts and response formats
- Document complex business logic within helper methods when necessary

5
.gitignore vendored
View File

@@ -116,4 +116,7 @@ temp/
/backups/
.django_tailwind_cli/
backend/.env
frontend/.env
frontend/.env
# Extracted packages
django-forwardemail/

View File

@@ -8,14 +8,14 @@ for various events including submission approvals/rejections.
from django.utils import timezone
from django.contrib.contenttypes.models import ContentType
from django.template.loader import render_to_string
from django.core.mail import send_mail
from django.conf import settings
from django.db import models
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
import logging
from apps.accounts.models.notifications import UserNotification, NotificationPreference
from apps.accounts.models import User
from apps.accounts.models import User, UserNotification, NotificationPreference
from apps.email_service.services import EmailService
logger = logging.getLogger(__name__)
@@ -32,7 +32,7 @@ class NotificationService:
related_object: Optional[Any] = None,
priority: str = UserNotification.Priority.NORMAL,
extra_data: Optional[Dict[str, Any]] = None,
expires_at: Optional[timezone.datetime] = None,
expires_at: Optional[datetime] = None,
) -> UserNotification:
"""
Create a new notification for a user.
@@ -220,17 +220,13 @@ class NotificationService:
):
NotificationService._send_email_notification(notification)
# Send push notification if enabled
if preferences.should_send_notification(notification.notification_type, "push"):
NotificationService._send_push_notification(notification)
# In-app notifications are always created (the notification object itself)
# The frontend will check preferences when displaying them
# Toast notifications are always created (the notification object itself)
# The frontend will display them as toast notifications based on preferences
@staticmethod
def _send_email_notification(notification: UserNotification) -> None:
"""
Send email notification to user.
Send email notification to user using the custom ForwardEmail service.
Args:
notification: The notification to send via email
@@ -251,14 +247,12 @@ class NotificationService:
html_message = render_to_string("emails/notification.html", context)
plain_message = render_to_string("emails/notification.txt", context)
# Send email
send_mail(
# Send email using custom ForwardEmail service
EmailService.send_email(
to=user.email,
subject=subject,
message=plain_message,
html_message=html_message,
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[user.email],
fail_silently=False,
text=plain_message,
html=html_message,
)
# Mark as sent
@@ -275,28 +269,6 @@ class NotificationService:
f"Failed to send email notification {notification.id}: {str(e)}"
)
@staticmethod
def _send_push_notification(notification: UserNotification) -> None:
"""
Send push notification to user.
Args:
notification: The notification to send via push
"""
try:
# TODO: Implement push notification service (Firebase, etc.)
# For now, just mark as sent
notification.push_sent = True
notification.push_sent_at = timezone.now()
notification.save(update_fields=["push_sent", "push_sent_at"])
logger.info(f"Push notification sent for notification {notification.id}")
except Exception as e:
logger.error(
f"Failed to send push notification {notification.id}: {str(e)}"
)
@staticmethod
def get_user_notifications(
user: User,
@@ -366,7 +338,7 @@ class NotificationService:
Returns:
int: Number of notifications deleted
"""
cutoff_date = timezone.now() - timezone.timedelta(days=days)
cutoff_date = timezone.now() - timedelta(days=days)
old_notifications = UserNotification.objects.filter(
is_read=True, read_at__lt=cutoff_date

View File

@@ -126,209 +126,210 @@ class MapLocationsAPIView(APIView):
permission_classes = [AllowAny]
def _parse_request_parameters(self, request: HttpRequest) -> dict:
"""Parse and validate request parameters."""
return {
"north": request.GET.get("north"),
"south": request.GET.get("south"),
"east": request.GET.get("east"),
"west": request.GET.get("west"),
"zoom": request.GET.get("zoom", 10),
"types": request.GET.get("types", "park,ride").split(","),
"cluster": request.GET.get("cluster", "false").lower() == "true",
"query": request.GET.get("q", "").strip(),
}
def _build_cache_key(self, params: dict) -> str:
"""Build cache key from parameters."""
return (
f"map_locations_{params['north']}_{params['south']}_"
f"{params['east']}_{params['west']}_{params['zoom']}_"
f"{','.join(params['types'])}_{params['cluster']}_{params['query']}"
)
def _create_bounds_polygon(self, north: str, south: str, east: str, west: str) -> Polygon | None:
"""Create bounds polygon from coordinate strings."""
if not all([north, south, east, west]):
return None
try:
return Polygon.from_bbox(
(float(west), float(south), float(east), float(north))
)
except (ValueError, TypeError):
return None
def _serialize_park_location(self, park) -> dict:
"""Serialize park location data."""
location = park.location if hasattr(
park, "location") and park.location else None
return {
"city": location.city if location else "",
"state": location.state if location else "",
"country": location.country if location else "",
"formatted_address": location.formatted_address if location else "",
}
def _serialize_park_data(self, park) -> dict:
"""Serialize park data for map response."""
location = park.location if hasattr(
park, "location") and park.location else None
return {
"id": park.id,
"type": "park",
"name": park.name,
"slug": park.slug,
"latitude": location.latitude if location else None,
"longitude": location.longitude if location else None,
"status": park.status,
"location": self._serialize_park_location(park),
"stats": {
"coaster_count": park.coaster_count or 0,
"ride_count": park.ride_count or 0,
"average_rating": (
float(park.average_rating) if park.average_rating else None
),
},
}
def _get_parks_data(self, params: dict) -> list:
"""Get and serialize parks data."""
if "park" not in params["types"]:
return []
parks_query = Park.objects.select_related(
"location", "operator"
).filter(location__point__isnull=False)
# Apply bounds filtering
bounds_polygon = self._create_bounds_polygon(
params["north"], params["south"], params["east"], params["west"]
)
if bounds_polygon:
parks_query = parks_query.filter(location__point__within=bounds_polygon)
# Apply text search
if params["query"]:
parks_query = parks_query.filter(
Q(name__icontains=params["query"])
| Q(location__city__icontains=params["query"])
| Q(location__state__icontains=params["query"])
)
return [self._serialize_park_data(park) for park in parks_query[:100]]
def _serialize_ride_location(self, ride) -> dict:
"""Serialize ride location data."""
location = (
ride.park.location
if hasattr(ride.park, "location") and ride.park.location
else None
)
return {
"city": location.city if location else "",
"state": location.state if location else "",
"country": location.country if location else "",
"formatted_address": location.formatted_address if location else "",
}
def _serialize_ride_data(self, ride) -> dict:
"""Serialize ride data for map response."""
location = (
ride.park.location
if hasattr(ride.park, "location") and ride.park.location
else None
)
return {
"id": ride.id,
"type": "ride",
"name": ride.name,
"slug": ride.slug,
"latitude": location.latitude if location else None,
"longitude": location.longitude if location else None,
"status": ride.status,
"location": self._serialize_ride_location(ride),
"stats": {
"category": ride.get_category_display() if ride.category else None,
"average_rating": (
float(ride.average_rating) if ride.average_rating else None
),
"park_name": ride.park.name,
},
}
def _get_rides_data(self, params: dict) -> list:
"""Get and serialize rides data."""
if "ride" not in params["types"]:
return []
rides_query = Ride.objects.select_related(
"park__location", "manufacturer"
).filter(park__location__point__isnull=False)
# Apply bounds filtering
bounds_polygon = self._create_bounds_polygon(
params["north"], params["south"], params["east"], params["west"]
)
if bounds_polygon:
rides_query = rides_query.filter(
park__location__point__within=bounds_polygon)
# Apply text search
if params["query"]:
rides_query = rides_query.filter(
Q(name__icontains=params["query"])
| Q(park__name__icontains=params["query"])
| Q(park__location__city__icontains=params["query"])
)
return [self._serialize_ride_data(ride) for ride in rides_query[:100]]
def _calculate_bounds(self, locations: list) -> dict:
"""Calculate bounds from location results."""
if not locations:
return {}
lats = [loc["latitude"] for loc in locations if loc["latitude"]]
lngs = [loc["longitude"] for loc in locations if loc["longitude"]]
if not lats or not lngs:
return {}
return {
"north": max(lats),
"south": min(lats),
"east": max(lngs),
"west": min(lngs),
}
def _build_response(self, locations: list, params: dict) -> dict:
"""Build the final response data."""
return {
"status": "success",
"locations": locations,
"clusters": [], # TODO: Implement clustering
"bounds": self._calculate_bounds(locations),
"total_count": len(locations),
"clustered": params["cluster"],
}
def get(self, request: HttpRequest) -> Response:
"""Get map locations with optional clustering and filtering."""
try:
# Parse query parameters
north = request.GET.get("north")
south = request.GET.get("south")
east = request.GET.get("east")
west = request.GET.get("west")
zoom = request.GET.get("zoom", 10)
types = request.GET.get("types", "park,ride").split(",")
cluster = request.GET.get("cluster", "false").lower() == "true"
query = request.GET.get("q", "").strip()
params = self._parse_request_parameters(request)
cache_key = self._build_cache_key(params)
# Build cache key
cache_key = f"map_locations_{north}_{south}_{east}_{west}_{zoom}_{','.join(types)}_{cluster}_{query}"
# Check cache first
cached_result = cache.get(cache_key)
if cached_result:
return Response(cached_result)
locations = []
total_count = 0
# Get location data
parks_data = self._get_parks_data(params)
rides_data = self._get_rides_data(params)
locations = parks_data + rides_data
# Get parks if requested
if "park" in types:
parks_query = Park.objects.select_related(
"location", "operator"
).filter(location__point__isnull=False)
# Apply bounds filtering
if all([north, south, east, west]):
try:
bounds_polygon = Polygon.from_bbox(
(float(west), float(south), float(east), float(north))
)
parks_query = parks_query.filter(
location__point__within=bounds_polygon
)
except (ValueError, TypeError):
pass
# Apply text search
if query:
parks_query = parks_query.filter(
Q(name__icontains=query)
| Q(location__city__icontains=query)
| Q(location__state__icontains=query)
)
# Serialize parks
for park in parks_query[:100]: # Limit results
park_data = {
"id": park.id,
"type": "park",
"name": park.name,
"slug": park.slug,
"latitude": (
park.location.latitude
if hasattr(park, "location") and park.location
else None
),
"longitude": (
park.location.longitude
if hasattr(park, "location") and park.location
else None
),
"status": park.status,
"location": {
"city": (
park.location.city
if hasattr(park, "location") and park.location
else ""
),
"state": (
park.location.state
if hasattr(park, "location") and park.location
else ""
),
"country": (
park.location.country
if hasattr(park, "location") and park.location
else ""
),
"formatted_address": (
park.location.formatted_address
if hasattr(park, "location") and park.location
else ""
),
},
"stats": {
"coaster_count": park.coaster_count or 0,
"ride_count": park.ride_count or 0,
"average_rating": (
float(park.average_rating)
if park.average_rating
else None
),
},
}
locations.append(park_data)
# Get rides if requested
if "ride" in types:
rides_query = Ride.objects.select_related(
"park__location", "manufacturer"
).filter(park__location__point__isnull=False)
# Apply bounds filtering
if all([north, south, east, west]):
try:
bounds_polygon = Polygon.from_bbox(
(float(west), float(south), float(east), float(north))
)
rides_query = rides_query.filter(
park__location__point__within=bounds_polygon
)
except (ValueError, TypeError):
pass
# Apply text search
if query:
rides_query = rides_query.filter(
Q(name__icontains=query)
| Q(park__name__icontains=query)
| Q(park__location__city__icontains=query)
)
# Serialize rides
for ride in rides_query[:100]: # Limit results
ride_data = {
"id": ride.id,
"type": "ride",
"name": ride.name,
"slug": ride.slug,
"latitude": (
ride.park.location.latitude
if hasattr(ride.park, "location") and ride.park.location
else None
),
"longitude": (
ride.park.location.longitude
if hasattr(ride.park, "location") and ride.park.location
else None
),
"status": ride.status,
"location": {
"city": (
ride.park.location.city
if hasattr(ride.park, "location") and ride.park.location
else ""
),
"state": (
ride.park.location.state
if hasattr(ride.park, "location") and ride.park.location
else ""
),
"country": (
ride.park.location.country
if hasattr(ride.park, "location") and ride.park.location
else ""
),
"formatted_address": (
ride.park.location.formatted_address
if hasattr(ride.park, "location") and ride.park.location
else ""
),
},
"stats": {
"category": (
ride.get_category_display() if ride.category else None
),
"average_rating": (
float(ride.average_rating)
if ride.average_rating
else None
),
"park_name": ride.park.name,
},
}
locations.append(ride_data)
total_count = len(locations)
# Calculate bounds from results
bounds = {}
if locations:
lats = [loc["latitude"] for loc in locations if loc["latitude"]]
lngs = [loc["longitude"] for loc in locations if loc["longitude"]]
if lats and lngs:
bounds = {
"north": max(lats),
"south": min(lats),
"east": max(lngs),
"west": min(lngs),
}
result = {
"status": "success",
"locations": locations,
"clusters": [], # TODO: Implement clustering
"bounds": bounds,
"total_count": total_count,
"clustered": cluster,
}
# Build response
result = self._build_response(locations, params)
# Cache result for 5 minutes
cache.set(cache_key, result, 300)
@@ -805,11 +806,23 @@ class MapBoundsAPIView(APIView):
"""Get locations within specific geographic bounds."""
try:
# Parse required bounds parameters
north_str = request.GET.get("north")
south_str = request.GET.get("south")
east_str = request.GET.get("east")
west_str = request.GET.get("west")
if not all([north_str, south_str, east_str, west_str]):
return Response(
{"status": "error",
"message": "All bounds parameters (north, south, east, west) are required"},
status=status.HTTP_400_BAD_REQUEST,
)
try:
north = float(request.GET.get("north"))
south = float(request.GET.get("south"))
east = float(request.GET.get("east"))
west = float(request.GET.get("west"))
north = float(north_str) if north_str else 0.0
south = float(south_str) if south_str else 0.0
east = float(east_str) if east_str else 0.0
west = float(west_str) if west_str else 0.0
except (TypeError, ValueError):
return Response(
{"status": "error", "message": "Invalid bounds parameters"},
@@ -989,12 +1002,18 @@ class MapCacheAPIView(APIView):
"""Clear all map cache (admin only)."""
try:
# Clear all map-related cache keys
cache_keys = cache.keys("map_*")
if cache_keys:
cache.delete_many(cache_keys)
cleared_count = len(cache_keys)
else:
cleared_count = 0
# Note: cache.keys() may not be available in all cache backends
try:
cache_keys = cache.keys("map_*")
if cache_keys:
cache.delete_many(cache_keys)
cleared_count = len(cache_keys)
else:
cleared_count = 0
except AttributeError:
# Fallback: clear cache without pattern matching
cache.clear()
cleared_count = 1 # Indicate cache was cleared
return Response(
{
@@ -1014,7 +1033,9 @@ class MapCacheAPIView(APIView):
"""Invalidate specific cache entries."""
try:
# Get cache keys to invalidate from request data
cache_keys = request.data.get("cache_keys", [])
request_data = getattr(request, 'data', {})
cache_keys = request_data.get("cache_keys", []) if request_data else []
if cache_keys:
cache.delete_many(cache_keys)
invalidated_count = len(cache_keys)