mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-22 17:51:08 -05:00
- Created a comprehensive documentation file for state machine diagrams, detailing various states and transitions for models such as EditSubmission, ModerationReport, and Park Status. - Included transition matrices for each state machine to clarify role requirements and guards. - Developed a new document providing code examples for implementing state machines, including adding new state machines to models, defining custom guards, implementing callbacks, and testing state machines. - Added examples for document approval workflows, custom guards, email notifications, and cache invalidation callbacks. - Implemented a test suite for document workflows, covering various scenarios including approval, rejection, and transition logging.
1061 lines
32 KiB
Markdown
1061 lines
32 KiB
Markdown
# State Machine Code Examples
|
|
|
|
This document provides detailed code examples for implementing and using state machines in ThrillWiki.
|
|
|
|
## Table of Contents
|
|
|
|
1. [Adding a New State Machine to a Model](#adding-a-new-state-machine-to-a-model)
|
|
2. [Defining Custom Guards](#defining-custom-guards)
|
|
3. [Implementing Callbacks](#implementing-callbacks)
|
|
4. [Testing State Machines](#testing-state-machines)
|
|
5. [Working with Transition History](#working-with-transition-history)
|
|
|
|
---
|
|
|
|
## Adding a New State Machine to a Model
|
|
|
|
### Complete Example: Document Approval Workflow
|
|
|
|
```python
|
|
# apps/documents/choices.py
|
|
from apps.core.choices import RichChoice, register_choices, ChoiceCategory
|
|
|
|
DOCUMENT_STATUSES = [
|
|
RichChoice(
|
|
value="DRAFT",
|
|
label="Draft",
|
|
category=ChoiceCategory.STATUS,
|
|
color="gray",
|
|
icon="edit",
|
|
css_class="status-draft",
|
|
description="Document is being drafted",
|
|
metadata={
|
|
'can_transition_to': ['REVIEW', 'CANCELLED'],
|
|
'requires_moderator': False,
|
|
'is_final': False,
|
|
'default_next': 'REVIEW',
|
|
}
|
|
),
|
|
RichChoice(
|
|
value="REVIEW",
|
|
label="Under Review",
|
|
category=ChoiceCategory.STATUS,
|
|
color="yellow",
|
|
icon="eye",
|
|
css_class="status-review",
|
|
description="Document is being reviewed",
|
|
metadata={
|
|
'can_transition_to': ['APPROVED', 'REJECTED', 'DRAFT'],
|
|
'requires_moderator': True,
|
|
'requires_assignment': True,
|
|
'is_final': False,
|
|
}
|
|
),
|
|
RichChoice(
|
|
value="APPROVED",
|
|
label="Approved",
|
|
category=ChoiceCategory.STATUS,
|
|
color="green",
|
|
icon="check",
|
|
css_class="status-approved",
|
|
description="Document has been approved",
|
|
metadata={
|
|
'can_transition_to': ['ARCHIVED'],
|
|
'requires_moderator': True,
|
|
'is_final': False,
|
|
}
|
|
),
|
|
RichChoice(
|
|
value="REJECTED",
|
|
label="Rejected",
|
|
category=ChoiceCategory.STATUS,
|
|
color="red",
|
|
icon="x",
|
|
css_class="status-rejected",
|
|
description="Document has been rejected",
|
|
metadata={
|
|
'can_transition_to': ['DRAFT'], # Can resubmit
|
|
'requires_moderator': False,
|
|
'is_final': False,
|
|
}
|
|
),
|
|
RichChoice(
|
|
value="ARCHIVED",
|
|
label="Archived",
|
|
category=ChoiceCategory.STATUS,
|
|
color="gray",
|
|
icon="archive",
|
|
css_class="status-archived",
|
|
description="Document has been archived",
|
|
metadata={
|
|
'can_transition_to': [],
|
|
'is_final': True,
|
|
}
|
|
),
|
|
RichChoice(
|
|
value="CANCELLED",
|
|
label="Cancelled",
|
|
category=ChoiceCategory.STATUS,
|
|
color="gray",
|
|
icon="ban",
|
|
css_class="status-cancelled",
|
|
description="Document was cancelled",
|
|
metadata={
|
|
'can_transition_to': [],
|
|
'is_final': True,
|
|
}
|
|
),
|
|
]
|
|
|
|
register_choices('document_statuses', 'documents', DOCUMENT_STATUSES)
|
|
```
|
|
|
|
```python
|
|
# apps/documents/models.py
|
|
from django.db import models
|
|
from django.contrib.auth import get_user_model
|
|
from django.utils import timezone
|
|
from apps.core.state_machine import RichFSMField, StateMachineMixin
|
|
from apps.core.models import TrackedModel
|
|
import pghistory
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
@pghistory.track()
|
|
class Document(StateMachineMixin, TrackedModel):
|
|
"""
|
|
Document model with approval workflow state machine.
|
|
|
|
States: DRAFT → REVIEW → APPROVED/REJECTED → ARCHIVED
|
|
|
|
The state_field_name attribute tells StateMachineMixin which field
|
|
holds the FSM state. Transition methods are auto-generated.
|
|
"""
|
|
|
|
state_field_name = "status"
|
|
|
|
title = models.CharField(max_length=255)
|
|
content = models.TextField()
|
|
author = models.ForeignKey(
|
|
User,
|
|
on_delete=models.CASCADE,
|
|
related_name='documents'
|
|
)
|
|
|
|
status = RichFSMField(
|
|
choice_group="document_statuses",
|
|
domain="documents",
|
|
max_length=20,
|
|
default="DRAFT",
|
|
help_text="Current workflow status"
|
|
)
|
|
|
|
# Review tracking
|
|
reviewer = models.ForeignKey(
|
|
User,
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
blank=True,
|
|
related_name='reviewed_documents'
|
|
)
|
|
reviewed_at = models.DateTimeField(null=True, blank=True)
|
|
review_notes = models.TextField(blank=True)
|
|
|
|
# Approval tracking
|
|
approved_by = models.ForeignKey(
|
|
User,
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
blank=True,
|
|
related_name='approved_documents'
|
|
)
|
|
approved_at = models.DateTimeField(null=True, blank=True)
|
|
|
|
# Rejection tracking
|
|
rejection_reason = models.TextField(blank=True)
|
|
|
|
class Meta(TrackedModel.Meta):
|
|
ordering = ['-created_at']
|
|
|
|
def __str__(self):
|
|
return f"{self.title} ({self.get_status_display()})"
|
|
|
|
# =========================================================================
|
|
# Wrapper Methods for Common Workflows
|
|
# =========================================================================
|
|
|
|
def submit_for_review(self, *, user=None):
|
|
"""
|
|
Submit draft document for review.
|
|
|
|
Args:
|
|
user: User submitting (for audit)
|
|
|
|
Raises:
|
|
TransitionNotAllowed: If not in DRAFT status
|
|
"""
|
|
self.transition_to_review(user=user)
|
|
self.save()
|
|
|
|
def assign_reviewer(self, reviewer, *, user=None):
|
|
"""
|
|
Assign a reviewer to this document.
|
|
|
|
Args:
|
|
reviewer: User to assign as reviewer
|
|
user: User making the assignment (for audit)
|
|
"""
|
|
self.reviewer = reviewer
|
|
self.save()
|
|
|
|
def approve(self, *, notes=None, user=None):
|
|
"""
|
|
Approve the document.
|
|
|
|
Args:
|
|
notes: Optional approval notes
|
|
user: User approving (required for audit)
|
|
|
|
Raises:
|
|
TransitionNotAllowed: If not in REVIEW status
|
|
"""
|
|
self.transition_to_approved(user=user)
|
|
self.approved_by = user
|
|
self.approved_at = timezone.now()
|
|
if notes:
|
|
self.review_notes = notes
|
|
self.save()
|
|
|
|
def reject(self, *, reason, user=None):
|
|
"""
|
|
Reject the document.
|
|
|
|
Args:
|
|
reason: Required rejection reason
|
|
user: User rejecting (required for audit)
|
|
|
|
Raises:
|
|
TransitionNotAllowed: If not in REVIEW status
|
|
ValidationError: If reason is empty
|
|
"""
|
|
from django.core.exceptions import ValidationError
|
|
|
|
if not reason:
|
|
raise ValidationError("Rejection reason is required")
|
|
|
|
self.transition_to_rejected(user=user)
|
|
self.rejection_reason = reason
|
|
self.reviewed_at = timezone.now()
|
|
self.save()
|
|
|
|
def revise(self, *, user=None):
|
|
"""
|
|
Return rejected document to draft for revision.
|
|
|
|
Args:
|
|
user: User initiating revision (for audit)
|
|
|
|
Raises:
|
|
TransitionNotAllowed: If not in REJECTED status
|
|
"""
|
|
self.transition_to_draft(user=user)
|
|
self.rejection_reason = '' # Clear previous rejection
|
|
self.save()
|
|
|
|
def archive(self, *, user=None):
|
|
"""
|
|
Archive an approved document.
|
|
|
|
Args:
|
|
user: User archiving (for audit)
|
|
|
|
Raises:
|
|
TransitionNotAllowed: If not in APPROVED status
|
|
"""
|
|
self.transition_to_archived(user=user)
|
|
self.save()
|
|
|
|
def cancel(self, *, user=None):
|
|
"""
|
|
Cancel a draft document.
|
|
|
|
Args:
|
|
user: User cancelling (for audit)
|
|
|
|
Raises:
|
|
TransitionNotAllowed: If not in DRAFT status
|
|
"""
|
|
self.transition_to_cancelled(user=user)
|
|
self.save()
|
|
```
|
|
|
|
---
|
|
|
|
## Defining Custom Guards
|
|
|
|
### Example: Department-Based Access Guard
|
|
|
|
```python
|
|
# apps/core/state_machine/guards/department_guard.py
|
|
from typing import Optional, List, Any
|
|
from .base import BaseGuard
|
|
|
|
|
|
class DepartmentGuard(BaseGuard):
|
|
"""
|
|
Guard that checks if user belongs to allowed departments.
|
|
|
|
Example:
|
|
guard = DepartmentGuard(
|
|
allowed_departments=['Engineering', 'Product'],
|
|
department_field='department'
|
|
)
|
|
"""
|
|
|
|
ERROR_CODE_NO_USER = 'NO_USER'
|
|
ERROR_CODE_NO_DEPARTMENT = 'NO_DEPARTMENT'
|
|
ERROR_CODE_DEPARTMENT_DENIED = 'DEPARTMENT_DENIED'
|
|
|
|
def __init__(
|
|
self,
|
|
allowed_departments: Optional[List[str]] = None,
|
|
blocked_departments: Optional[List[str]] = None,
|
|
department_field: str = 'department',
|
|
allow_admin_override: bool = True,
|
|
error_message: Optional[str] = None
|
|
):
|
|
"""
|
|
Initialize department guard.
|
|
|
|
Args:
|
|
allowed_departments: List of allowed department names
|
|
blocked_departments: List of blocked department names
|
|
department_field: Field name on user model for department
|
|
allow_admin_override: If True, admins bypass department check
|
|
error_message: Custom error message
|
|
"""
|
|
self.allowed_departments = allowed_departments or []
|
|
self.blocked_departments = blocked_departments or []
|
|
self.department_field = department_field
|
|
self.allow_admin_override = allow_admin_override
|
|
self._error_message = error_message
|
|
self.error_code = None
|
|
self._failed_department = None
|
|
|
|
def __call__(self, instance: Any, user: Any = None) -> bool:
|
|
"""
|
|
Check if user's department allows the transition.
|
|
|
|
Args:
|
|
instance: Model instance (not used in this guard)
|
|
user: User attempting the transition
|
|
|
|
Returns:
|
|
True if allowed, False otherwise
|
|
"""
|
|
# No user provided
|
|
if user is None:
|
|
self.error_code = self.ERROR_CODE_NO_USER
|
|
return False
|
|
|
|
# Admin override
|
|
if self.allow_admin_override and hasattr(user, 'role'):
|
|
if user.role in ['ADMIN', 'SUPERUSER']:
|
|
return True
|
|
|
|
# Get user's department
|
|
department = getattr(user, self.department_field, None)
|
|
if department is None:
|
|
self.error_code = self.ERROR_CODE_NO_DEPARTMENT
|
|
return False
|
|
|
|
# Check blocked departments first
|
|
if self.blocked_departments and department in self.blocked_departments:
|
|
self.error_code = self.ERROR_CODE_DEPARTMENT_DENIED
|
|
self._failed_department = department
|
|
return False
|
|
|
|
# Check allowed departments
|
|
if self.allowed_departments and department not in self.allowed_departments:
|
|
self.error_code = self.ERROR_CODE_DEPARTMENT_DENIED
|
|
self._failed_department = department
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_error_message(self) -> str:
|
|
"""Get human-readable error message."""
|
|
if self._error_message:
|
|
return self._error_message
|
|
|
|
if self.error_code == self.ERROR_CODE_NO_USER:
|
|
return "User is required for this action"
|
|
elif self.error_code == self.ERROR_CODE_NO_DEPARTMENT:
|
|
return "User department information is missing"
|
|
elif self.error_code == self.ERROR_CODE_DEPARTMENT_DENIED:
|
|
allowed = ', '.join(self.allowed_departments) if self.allowed_departments else 'none specified'
|
|
return f"Department '{self._failed_department}' is not authorized. Allowed: {allowed}"
|
|
|
|
return "Department check failed"
|
|
|
|
|
|
# Usage example
|
|
from apps.core.state_machine.guards import (
|
|
CompositeGuard,
|
|
PermissionGuard,
|
|
DepartmentGuard
|
|
)
|
|
|
|
# Require moderator role AND Engineering department
|
|
guard = CompositeGuard([
|
|
PermissionGuard(requires_moderator=True),
|
|
DepartmentGuard(allowed_departments=['Engineering', 'DevOps'])
|
|
], operator='AND')
|
|
```
|
|
|
|
### Example: Time-Based Guard
|
|
|
|
```python
|
|
# apps/core/state_machine/guards/time_guard.py
|
|
from datetime import time, datetime
|
|
from typing import Optional, Any, List
|
|
from .base import BaseGuard
|
|
|
|
|
|
class BusinessHoursGuard(BaseGuard):
|
|
"""
|
|
Guard that only allows transitions during business hours.
|
|
|
|
Example:
|
|
guard = BusinessHoursGuard(
|
|
start_hour=9,
|
|
end_hour=17,
|
|
allowed_days=[0, 1, 2, 3, 4] # Mon-Fri
|
|
)
|
|
"""
|
|
|
|
ERROR_CODE_OUTSIDE_HOURS = 'OUTSIDE_BUSINESS_HOURS'
|
|
|
|
def __init__(
|
|
self,
|
|
start_hour: int = 9,
|
|
end_hour: int = 17,
|
|
allowed_days: Optional[List[int]] = None,
|
|
timezone_field: str = 'timezone',
|
|
allow_admin_override: bool = True,
|
|
error_message: Optional[str] = None
|
|
):
|
|
"""
|
|
Initialize business hours guard.
|
|
|
|
Args:
|
|
start_hour: Start of business hours (0-23)
|
|
end_hour: End of business hours (0-23)
|
|
allowed_days: List of allowed weekdays (0=Monday, 6=Sunday)
|
|
timezone_field: Field on instance for timezone
|
|
allow_admin_override: If True, admins bypass time check
|
|
error_message: Custom error message
|
|
"""
|
|
self.start_hour = start_hour
|
|
self.end_hour = end_hour
|
|
self.allowed_days = allowed_days or [0, 1, 2, 3, 4] # Mon-Fri
|
|
self.timezone_field = timezone_field
|
|
self.allow_admin_override = allow_admin_override
|
|
self._error_message = error_message
|
|
self.error_code = None
|
|
|
|
def __call__(self, instance: Any, user: Any = None) -> bool:
|
|
"""Check if current time is within business hours."""
|
|
# Admin override
|
|
if self.allow_admin_override and user and hasattr(user, 'role'):
|
|
if user.role in ['ADMIN', 'SUPERUSER']:
|
|
return True
|
|
|
|
from django.utils import timezone
|
|
import pytz
|
|
|
|
# Get timezone from instance or use UTC
|
|
tz_name = getattr(instance, self.timezone_field, 'UTC')
|
|
try:
|
|
tz = pytz.timezone(tz_name)
|
|
except Exception:
|
|
tz = pytz.UTC
|
|
|
|
now = datetime.now(tz)
|
|
|
|
# Check day of week
|
|
if now.weekday() not in self.allowed_days:
|
|
self.error_code = self.ERROR_CODE_OUTSIDE_HOURS
|
|
return False
|
|
|
|
# Check hour
|
|
if not (self.start_hour <= now.hour < self.end_hour):
|
|
self.error_code = self.ERROR_CODE_OUTSIDE_HOURS
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_error_message(self) -> str:
|
|
if self._error_message:
|
|
return self._error_message
|
|
return f"This action is only allowed during business hours ({self.start_hour}:00 - {self.end_hour}:00, Mon-Fri)"
|
|
```
|
|
|
|
---
|
|
|
|
## Implementing Callbacks
|
|
|
|
### Example: Email Notification Callback
|
|
|
|
```python
|
|
# apps/core/state_machine/callbacks/notifications.py
|
|
from typing import Any, Dict
|
|
from django.core.mail import send_mail
|
|
from django.template.loader import render_to_string
|
|
from django.conf import settings
|
|
|
|
|
|
class EmailNotificationCallback:
|
|
"""
|
|
Callback that sends email notifications on state transitions.
|
|
|
|
Example:
|
|
callback = EmailNotificationCallback(
|
|
template='document_approved',
|
|
recipient_field='author.email',
|
|
subject_template='Your document has been {to_state}'
|
|
)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
template: str,
|
|
recipient_field: str,
|
|
subject_template: str,
|
|
transitions: list = None,
|
|
from_states: list = None,
|
|
to_states: list = None
|
|
):
|
|
"""
|
|
Initialize email notification callback.
|
|
|
|
Args:
|
|
template: Email template name (without extension)
|
|
recipient_field: Dot-notation path to recipient email
|
|
subject_template: Subject line template with {placeholders}
|
|
transitions: List of transition names to trigger on
|
|
from_states: List of source states to trigger on
|
|
to_states: List of target states to trigger on
|
|
"""
|
|
self.template = template
|
|
self.recipient_field = recipient_field
|
|
self.subject_template = subject_template
|
|
self.transitions = transitions or []
|
|
self.from_states = from_states or []
|
|
self.to_states = to_states or []
|
|
|
|
def should_execute(self, context: Dict[str, Any]) -> bool:
|
|
"""Check if this callback should execute for the given transition."""
|
|
# Check transition name
|
|
if self.transitions:
|
|
if context.get('transition_name') not in self.transitions:
|
|
return False
|
|
|
|
# Check from state
|
|
if self.from_states:
|
|
if context.get('from_state') not in self.from_states:
|
|
return False
|
|
|
|
# Check to state
|
|
if self.to_states:
|
|
if context.get('to_state') not in self.to_states:
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_recipient(self, instance: Any) -> str:
|
|
"""Get recipient email from instance using dot notation."""
|
|
value = instance
|
|
for part in self.recipient_field.split('.'):
|
|
value = getattr(value, part, None)
|
|
if value is None:
|
|
return None
|
|
return value
|
|
|
|
def __call__(self, context: Dict[str, Any]) -> None:
|
|
"""
|
|
Execute the notification callback.
|
|
|
|
Args:
|
|
context: Transition context with instance, from_state, to_state, user
|
|
"""
|
|
if not self.should_execute(context):
|
|
return
|
|
|
|
instance = context['instance']
|
|
recipient = self.get_recipient(instance)
|
|
|
|
if not recipient:
|
|
return
|
|
|
|
# Build email context
|
|
email_context = {
|
|
'instance': instance,
|
|
'from_state': context.get('from_state'),
|
|
'to_state': context.get('to_state'),
|
|
'user': context.get('user'),
|
|
'site_name': getattr(settings, 'SITE_NAME', 'ThrillWiki'),
|
|
}
|
|
|
|
# Render email
|
|
subject = self.subject_template.format(**email_context)
|
|
html_message = render_to_string(
|
|
f'emails/{self.template}.html',
|
|
email_context
|
|
)
|
|
text_message = render_to_string(
|
|
f'emails/{self.template}.txt',
|
|
email_context
|
|
)
|
|
|
|
# Send email
|
|
send_mail(
|
|
subject=subject,
|
|
message=text_message,
|
|
from_email=settings.DEFAULT_FROM_EMAIL,
|
|
recipient_list=[recipient],
|
|
html_message=html_message,
|
|
fail_silently=True
|
|
)
|
|
|
|
|
|
# Registration
|
|
from apps.core.state_machine.registry import state_machine_registry
|
|
|
|
@state_machine_registry.register_callback('documents.Document', 'post_transition')
|
|
def document_approval_notification(instance, from_state, to_state, user):
|
|
"""Send notification when document is approved."""
|
|
if to_state == 'APPROVED':
|
|
callback = EmailNotificationCallback(
|
|
template='document_approved',
|
|
recipient_field='author.email',
|
|
subject_template='Your document "{instance.title}" has been approved'
|
|
)
|
|
callback({
|
|
'instance': instance,
|
|
'from_state': from_state,
|
|
'to_state': to_state,
|
|
'user': user
|
|
})
|
|
```
|
|
|
|
### Example: Cache Invalidation Callback
|
|
|
|
```python
|
|
# apps/core/state_machine/callbacks/cache.py
|
|
from typing import Any, Dict, List
|
|
from django.core.cache import cache
|
|
|
|
|
|
class CacheInvalidationCallback:
|
|
"""
|
|
Callback that invalidates cache keys on state transitions.
|
|
|
|
Example:
|
|
callback = CacheInvalidationCallback(
|
|
cache_keys=[
|
|
'park_list',
|
|
'park_detail_{instance.slug}',
|
|
'park_stats_{instance.id}'
|
|
]
|
|
)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
cache_keys: List[str],
|
|
to_states: List[str] = None,
|
|
cache_backend: str = 'default'
|
|
):
|
|
"""
|
|
Initialize cache invalidation callback.
|
|
|
|
Args:
|
|
cache_keys: List of cache key patterns (can include {placeholders})
|
|
to_states: Only invalidate for these target states
|
|
cache_backend: Cache backend to use
|
|
"""
|
|
self.cache_keys = cache_keys
|
|
self.to_states = to_states
|
|
self.cache_backend = cache_backend
|
|
|
|
def __call__(self, context: Dict[str, Any]) -> None:
|
|
"""Invalidate cache keys."""
|
|
# Check if we should run for this state
|
|
if self.to_states and context.get('to_state') not in self.to_states:
|
|
return
|
|
|
|
instance = context['instance']
|
|
|
|
for key_pattern in self.cache_keys:
|
|
try:
|
|
# Format key with instance attributes
|
|
key = key_pattern.format(instance=instance)
|
|
cache.delete(key)
|
|
except (KeyError, AttributeError):
|
|
# If formatting fails, try deleting the literal key
|
|
cache.delete(key_pattern)
|
|
|
|
|
|
# Registration for park status changes
|
|
@state_machine_registry.register_callback('parks.Park', 'post_transition')
|
|
def invalidate_park_cache(instance, from_state, to_state, user):
|
|
"""Invalidate park-related caches on status change."""
|
|
callback = CacheInvalidationCallback(
|
|
cache_keys=[
|
|
f'park_detail_{instance.slug}',
|
|
f'park_stats_{instance.id}',
|
|
'park_list_operating',
|
|
'park_list_all',
|
|
f'park_rides_{instance.id}',
|
|
]
|
|
)
|
|
callback({
|
|
'instance': instance,
|
|
'from_state': from_state,
|
|
'to_state': to_state,
|
|
'user': user
|
|
})
|
|
```
|
|
|
|
---
|
|
|
|
## Testing State Machines
|
|
|
|
### Complete Test Suite Example
|
|
|
|
```python
|
|
# apps/documents/tests/test_document_workflow.py
|
|
from django.test import TestCase
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.exceptions import ValidationError
|
|
from django_fsm import TransitionNotAllowed
|
|
|
|
from apps.documents.models import Document
|
|
from apps.core.state_machine.tests.helpers import (
|
|
assert_transition_allowed,
|
|
assert_transition_denied,
|
|
assert_state_log_created,
|
|
assert_state_transition_sequence,
|
|
transition_and_save
|
|
)
|
|
from apps.core.state_machine.tests.fixtures import UserFactory
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
class DocumentWorkflowTests(TestCase):
|
|
"""End-to-end tests for document approval workflow."""
|
|
|
|
def setUp(self):
|
|
"""Set up test fixtures."""
|
|
self.author = UserFactory.create_regular_user()
|
|
self.reviewer = UserFactory.create_moderator()
|
|
self.admin = UserFactory.create_admin()
|
|
|
|
def _create_document(self, status='DRAFT', **kwargs):
|
|
"""Helper to create a document."""
|
|
defaults = {
|
|
'title': 'Test Document',
|
|
'content': 'Test content',
|
|
'author': self.author
|
|
}
|
|
defaults.update(kwargs)
|
|
return Document.objects.create(status=status, **defaults)
|
|
|
|
# =========================================================================
|
|
# Happy Path Tests
|
|
# =========================================================================
|
|
|
|
def test_complete_approval_workflow(self):
|
|
"""Test the complete approval workflow from draft to archived."""
|
|
# Create draft
|
|
doc = self._create_document()
|
|
self.assertEqual(doc.status, 'DRAFT')
|
|
|
|
# Submit for review
|
|
doc.submit_for_review(user=self.author)
|
|
self.assertEqual(doc.status, 'REVIEW')
|
|
|
|
# Assign reviewer
|
|
doc.assign_reviewer(self.reviewer)
|
|
self.assertEqual(doc.reviewer, self.reviewer)
|
|
|
|
# Approve
|
|
doc.approve(notes='Looks good!', user=self.reviewer)
|
|
self.assertEqual(doc.status, 'APPROVED')
|
|
self.assertEqual(doc.approved_by, self.reviewer)
|
|
self.assertIsNotNone(doc.approved_at)
|
|
|
|
# Archive
|
|
doc.archive(user=self.admin)
|
|
self.assertEqual(doc.status, 'ARCHIVED')
|
|
|
|
# Verify transition history
|
|
assert_state_transition_sequence(doc, [
|
|
'REVIEW', 'APPROVED', 'ARCHIVED'
|
|
])
|
|
|
|
def test_rejection_and_revision_workflow(self):
|
|
"""Test rejection and revision workflow."""
|
|
doc = self._create_document()
|
|
|
|
# Submit and reject
|
|
doc.submit_for_review(user=self.author)
|
|
doc.assign_reviewer(self.reviewer)
|
|
doc.reject(reason='Needs more detail', user=self.reviewer)
|
|
|
|
self.assertEqual(doc.status, 'REJECTED')
|
|
self.assertEqual(doc.rejection_reason, 'Needs more detail')
|
|
|
|
# Revise and resubmit
|
|
doc.revise(user=self.author)
|
|
self.assertEqual(doc.status, 'DRAFT')
|
|
self.assertEqual(doc.rejection_reason, '') # Cleared
|
|
|
|
# Submit again
|
|
doc.submit_for_review(user=self.author)
|
|
self.assertEqual(doc.status, 'REVIEW')
|
|
|
|
# =========================================================================
|
|
# Permission Tests
|
|
# =========================================================================
|
|
|
|
def test_only_moderator_can_approve(self):
|
|
"""Test that regular users cannot approve documents."""
|
|
doc = self._create_document(status='REVIEW')
|
|
doc.reviewer = self.reviewer
|
|
doc.save()
|
|
|
|
# Regular user cannot approve
|
|
with self.assertRaises(TransitionNotAllowed):
|
|
doc.transition_to_approved(user=self.author)
|
|
|
|
# Moderator can approve
|
|
doc.transition_to_approved(user=self.reviewer)
|
|
self.assertEqual(doc.status, 'APPROVED')
|
|
|
|
def test_rejection_requires_reason(self):
|
|
"""Test that rejection requires a reason."""
|
|
doc = self._create_document(status='REVIEW')
|
|
doc.reviewer = self.reviewer
|
|
doc.save()
|
|
|
|
with self.assertRaises(ValidationError) as ctx:
|
|
doc.reject(reason='', user=self.reviewer)
|
|
|
|
self.assertIn('reason', str(ctx.exception).lower())
|
|
|
|
# =========================================================================
|
|
# Invalid Transition Tests
|
|
# =========================================================================
|
|
|
|
def test_archived_is_final_state(self):
|
|
"""Test that archived documents cannot transition."""
|
|
doc = self._create_document(status='ARCHIVED')
|
|
|
|
with self.assertRaises(TransitionNotAllowed):
|
|
doc.transition_to_draft(user=self.admin)
|
|
|
|
with self.assertRaises(TransitionNotAllowed):
|
|
doc.transition_to_review(user=self.admin)
|
|
|
|
def test_cancelled_is_final_state(self):
|
|
"""Test that cancelled documents cannot transition."""
|
|
doc = self._create_document(status='CANCELLED')
|
|
|
|
with self.assertRaises(TransitionNotAllowed):
|
|
doc.transition_to_draft(user=self.admin)
|
|
|
|
def test_cannot_approve_draft_directly(self):
|
|
"""Test that drafts cannot skip review."""
|
|
doc = self._create_document(status='DRAFT')
|
|
|
|
with self.assertRaises(TransitionNotAllowed):
|
|
doc.transition_to_approved(user=self.reviewer)
|
|
|
|
# =========================================================================
|
|
# Transition Logging Tests
|
|
# =========================================================================
|
|
|
|
def test_transitions_are_logged(self):
|
|
"""Test that all transitions create log entries."""
|
|
doc = self._create_document()
|
|
|
|
doc.submit_for_review(user=self.author)
|
|
|
|
log = assert_state_log_created(doc, 'REVIEW', self.author)
|
|
self.assertIsNotNone(log.timestamp)
|
|
|
|
def test_log_includes_transition_user(self):
|
|
"""Test that logs include the user who made the transition."""
|
|
doc = self._create_document(status='REVIEW')
|
|
doc.reviewer = self.reviewer
|
|
doc.save()
|
|
|
|
doc.approve(user=self.reviewer)
|
|
|
|
log = assert_state_log_created(doc, 'APPROVED')
|
|
self.assertEqual(log.by, self.reviewer)
|
|
```
|
|
|
|
---
|
|
|
|
## Working with Transition History
|
|
|
|
### Querying Transition History
|
|
|
|
```python
|
|
from django_fsm_log.models import StateLog
|
|
from django.contrib.contenttypes.models import ContentType
|
|
|
|
|
|
def get_transition_history(instance, limit=None):
|
|
"""
|
|
Get transition history for any model instance.
|
|
|
|
Args:
|
|
instance: Model instance with FSM field
|
|
limit: Optional limit on number of entries
|
|
|
|
Returns:
|
|
QuerySet of StateLog entries
|
|
"""
|
|
ct = ContentType.objects.get_for_model(instance)
|
|
qs = StateLog.objects.filter(
|
|
content_type=ct,
|
|
object_id=instance.id
|
|
).select_related('by').order_by('-timestamp')
|
|
|
|
if limit:
|
|
qs = qs[:limit]
|
|
|
|
return qs
|
|
|
|
|
|
def get_time_in_state(instance, state):
|
|
"""
|
|
Calculate how long an instance spent in a specific state.
|
|
|
|
Args:
|
|
instance: Model instance
|
|
state: State value to calculate time for
|
|
|
|
Returns:
|
|
timedelta or None if state not found
|
|
"""
|
|
from datetime import timedelta
|
|
|
|
ct = ContentType.objects.get_for_model(instance)
|
|
logs = list(StateLog.objects.filter(
|
|
content_type=ct,
|
|
object_id=instance.id
|
|
).order_by('timestamp'))
|
|
|
|
total_time = timedelta()
|
|
entered_at = None
|
|
|
|
for log in logs:
|
|
if log.state == state and entered_at is None:
|
|
entered_at = log.timestamp
|
|
elif log.state != state and entered_at is not None:
|
|
total_time += log.timestamp - entered_at
|
|
entered_at = None
|
|
|
|
# If still in the state
|
|
if entered_at is not None:
|
|
from django.utils import timezone
|
|
total_time += timezone.now() - entered_at
|
|
|
|
return total_time if total_time.total_seconds() > 0 else None
|
|
|
|
|
|
def get_users_who_transitioned(instance, to_state):
|
|
"""
|
|
Get all users who transitioned an instance to a specific state.
|
|
|
|
Args:
|
|
instance: Model instance
|
|
to_state: Target state
|
|
|
|
Returns:
|
|
QuerySet of User objects
|
|
"""
|
|
from django.contrib.auth import get_user_model
|
|
User = get_user_model()
|
|
|
|
ct = ContentType.objects.get_for_model(instance)
|
|
user_ids = StateLog.objects.filter(
|
|
content_type=ct,
|
|
object_id=instance.id,
|
|
state=to_state
|
|
).values_list('by_id', flat=True).distinct()
|
|
|
|
return User.objects.filter(id__in=user_ids)
|
|
```
|
|
|
|
### API View for Transition History
|
|
|
|
```python
|
|
# apps/core/state_machine/views.py
|
|
from rest_framework.views import APIView
|
|
from rest_framework.response import Response
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from django_fsm_log.models import StateLog
|
|
from django.contrib.contenttypes.models import ContentType
|
|
|
|
|
|
class TransitionHistoryView(APIView):
|
|
"""
|
|
API view for retrieving transition history.
|
|
|
|
GET /api/history/<content_type>/<object_id>/
|
|
"""
|
|
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def get(self, request, content_type_str, object_id):
|
|
"""Get transition history for an object."""
|
|
try:
|
|
app_label, model = content_type_str.split('.')
|
|
ct = ContentType.objects.get(app_label=app_label, model=model)
|
|
except (ValueError, ContentType.DoesNotExist):
|
|
return Response({'error': 'Invalid content type'}, status=400)
|
|
|
|
logs = StateLog.objects.filter(
|
|
content_type=ct,
|
|
object_id=object_id
|
|
).select_related('by').order_by('-timestamp')
|
|
|
|
data = [
|
|
{
|
|
'id': log.id,
|
|
'timestamp': log.timestamp.isoformat(),
|
|
'from_state': log.source_state,
|
|
'to_state': log.state,
|
|
'transition': log.transition,
|
|
'user': {
|
|
'id': log.by.id,
|
|
'username': log.by.username
|
|
} if log.by else None,
|
|
'description': log.description
|
|
}
|
|
for log in logs
|
|
]
|
|
|
|
return Response({
|
|
'count': len(data),
|
|
'results': data
|
|
})
|