""" Celery tasks for moderation app. This module contains background tasks for moderation management including: - Automatic expiration of stale claim locks - Cleanup of orphaned submissions """ import logging from datetime import timedelta from celery import shared_task from django.contrib.auth import get_user_model from django.db import transaction from django.utils import timezone from apps.core.utils import capture_and_log logger = logging.getLogger(__name__) User = get_user_model() # Default lock duration in minutes (matching views.py) DEFAULT_LOCK_DURATION_MINUTES = 15 @shared_task(name="moderation.expire_stale_claims") def expire_stale_claims(lock_duration_minutes: int = None) -> dict: """ Expire claims on submissions that have been locked for too long without action. This task finds submissions in CLAIMED status where claimed_at is older than the lock duration (default 15 minutes) and releases them back to PENDING so other moderators can claim them. This task should be run every 5 minutes via Celery Beat. Args: lock_duration_minutes: Override the default lock duration (15 minutes) Returns: dict: Summary with counts of processed, succeeded, and failed releases """ from apps.moderation.models import EditSubmission if lock_duration_minutes is None: lock_duration_minutes = DEFAULT_LOCK_DURATION_MINUTES logger.info("Starting stale claims expiration check (timeout: %d minutes)", lock_duration_minutes) # Calculate cutoff time (claims older than this should be released) cutoff_time = timezone.now() - timedelta(minutes=lock_duration_minutes) result = { "edit_submissions": {"processed": 0, "released": 0, "failed": 0}, "failures": [], "cutoff_time": cutoff_time.isoformat(), } # Process EditSubmissions with stale claims # Query without lock first, then lock each row individually in transaction stale_edit_ids = list( EditSubmission.objects.filter( status="CLAIMED", claimed_at__lt=cutoff_time, ).values_list("id", flat=True) ) for submission_id in stale_edit_ids: result["edit_submissions"]["processed"] += 1 try: with transaction.atomic(): # Lock and fetch the specific row submission = EditSubmission.objects.select_for_update(skip_locked=True).filter( id=submission_id, status="CLAIMED", # Re-verify status in case it changed ).first() if submission: _release_claim(submission) result["edit_submissions"]["released"] += 1 logger.info( "Released stale claim on EditSubmission %s (claimed by %s at %s)", submission_id, submission.claimed_by, submission.claimed_at, ) except Exception as e: result["edit_submissions"]["failed"] += 1 error_msg = f"EditSubmission {submission_id}: {str(e)}" result["failures"].append(error_msg) capture_and_log( e, f"Release stale claim on EditSubmission {submission_id}", source="task", ) # Process EditSubmission with PHOTO type (unified model) stale_photo_edit_ids = list( EditSubmission.objects.filter( submission_type="PHOTO", status="CLAIMED", claimed_at__lt=cutoff_time, ).values_list("id", flat=True) ) for submission_id in stale_photo_edit_ids: result["edit_submissions"]["processed"] += 1 # Count with edit submissions try: with transaction.atomic(): submission = EditSubmission.objects.select_for_update(skip_locked=True).filter( id=submission_id, status="CLAIMED", ).first() if submission: _release_claim(submission) result["edit_submissions"]["released"] += 1 logger.info( "Released stale claim on PHOTO EditSubmission %s (claimed by %s at %s)", submission_id, submission.claimed_by, submission.claimed_at, ) except Exception as e: result["edit_submissions"]["failed"] += 1 error_msg = f"PHOTO EditSubmission {submission_id}: {str(e)}" result["failures"].append(error_msg) capture_and_log( e, f"Release stale claim on PHOTO EditSubmission {submission_id}", source="task", ) total_released = result["edit_submissions"]["released"] total_failed = result["edit_submissions"]["failed"] logger.info( "Completed stale claims expiration: %s released, %s failed", total_released, total_failed, ) return result def _release_claim(submission): """ Release a stale claim on a submission. Uses the unclaim() FSM method to properly transition from CLAIMED to PENDING and clear the claimed_by and claimed_at fields. Args: submission: EditSubmission instance """ # Store info for logging before clearing claimed_by = submission.claimed_by claimed_at = submission.claimed_at # Use the FSM unclaim method - pass None for system-initiated unclaim submission.unclaim(user=None) # Log the automatic release logger.debug( "Auto-released claim: submission=%s, was_claimed_by=%s, claimed_at=%s", submission.id, claimed_by, claimed_at, ) @shared_task(name="moderation.cleanup_cloudflare_image", bind=True, max_retries=3) def cleanup_cloudflare_image(self, image_id: str) -> dict: """ Delete an orphaned or rejected Cloudflare image. This task is called when a photo submission is rejected to cleanup the associated Cloudflare image and prevent orphaned assets. Args: image_id: The Cloudflare image ID to delete. Returns: dict: Result with success status and message. """ from apps.core.utils.cloudflare import delete_cloudflare_image logger.info("Cleaning up Cloudflare image: %s", image_id) try: success = delete_cloudflare_image(image_id) if success: return { "image_id": image_id, "success": True, "message": "Image deleted successfully", } else: # Retry on failure (may be transient API issue) raise Exception(f"Failed to delete Cloudflare image {image_id}") except Exception as e: logger.warning("Cloudflare image cleanup failed: %s (attempt %d)", str(e), self.request.retries + 1) # Retry with exponential backoff try: self.retry(exc=e, countdown=60 * (2 ** self.request.retries)) except self.MaxRetriesExceededError: logger.error("Max retries exceeded for Cloudflare image cleanup: %s", image_id) return { "image_id": image_id, "success": False, "message": f"Failed after {self.request.retries + 1} attempts: {str(e)}", }