chore: fix pghistory migration deps and improve htmx utilities

- Update pghistory dependency from 0007 to 0006 in account migrations
- Add docstrings and remove unused imports in htmx_forms.py
- Add DJANGO_SETTINGS_MODULE bash commands to Claude settings
- Add state transition definitions for ride statuses
This commit is contained in:
pacnpal
2025-12-21 17:33:24 -05:00
parent b9063ff4f8
commit 7ba0004c93
74 changed files with 11134 additions and 198 deletions

View File

@@ -1,5 +1,8 @@
"""
Base forms and views for HTMX integration.
"""
from django.views.generic.edit import FormView
from django.http import JsonResponse, HttpResponse
from django.http import JsonResponse
class HTMXFormView(FormView):
@@ -11,13 +14,15 @@ class HTMXFormView(FormView):
def validate_field(self, field_name):
"""Return JSON with errors for a single field based on the current form."""
form = self.get_form()
field = form[field_name]
form.is_valid() # populate errors
errors = form.errors.get(field_name, [])
return JsonResponse({"field": field_name, "errors": errors})
def post(self, request, *args, **kwargs):
# If HTMX field validation pattern: ?field=name
if request.headers.get("HX-Request") == "true" and request.GET.get("validate_field"):
if (
request.headers.get("HX-Request") == "true"
and request.GET.get("validate_field")
):
return self.validate_field(request.GET.get("validate_field"))
return super().post(request, *args, **kwargs)

View File

@@ -1,8 +1,34 @@
"""Utilities for HTMX integration in Django views."""
from functools import wraps
from django.http import HttpResponse, JsonResponse
from django.template import TemplateDoesNotExist
from django.template.loader import render_to_string
def _resolve_context_and_template(resp, default_template):
"""Extract context and template from view response."""
context = {}
template_name = default_template
if isinstance(resp, tuple):
if len(resp) >= 1:
context = resp[0]
if len(resp) >= 2 and resp[1]:
template_name = resp[1]
return context, template_name
def _render_htmx_or_full(request, template_name, context):
"""Try to render HTMX partial, fallback to full template."""
if request.headers.get("HX-Request") == "true":
partial = template_name.replace(".html", "_partial.html")
try:
return render_to_string(partial, context, request=request)
except TemplateDoesNotExist:
# Fall back to full template
return render_to_string(template_name, context, request=request)
return render_to_string(template_name, context, request=request)
def htmx_partial(template_name):
"""Decorator for view functions to render partials for HTMX requests.
@@ -18,27 +44,10 @@ def htmx_partial(template_name):
# If the view returned an HttpResponse, pass through
if isinstance(resp, HttpResponse):
return resp
# Expecting a tuple (context, template_name) or (context,)
context = {}
tpl = template_name
if isinstance(resp, tuple):
if len(resp) >= 1:
context = resp[0]
if len(resp) >= 2 and resp[1]:
tpl = resp[1]
# If HTMX, try partial template
if request.headers.get("HX-Request") == "true":
partial = tpl.replace(".html", "_partial.html")
try:
html = render_to_string(partial, context, request=request)
return HttpResponse(html)
except Exception:
# Fall back to full template
html = render_to_string(tpl, context, request=request)
return HttpResponse(html)
html = render_to_string(tpl, context, request=request)
context, tpl = _resolve_context_and_template(resp, template_name)
html = _render_htmx_or_full(request, tpl, context)
return HttpResponse(html)
return _wrapped
@@ -47,12 +56,14 @@ def htmx_partial(template_name):
def htmx_redirect(url):
"""Create a response that triggers a client-side redirect via HTMX."""
resp = HttpResponse("")
resp["HX-Redirect"] = url
return resp
def htmx_trigger(name: str, payload: dict = None):
"""Create a response that triggers a client-side event via HTMX."""
resp = HttpResponse("")
if payload is None:
resp["HX-Trigger"] = name
@@ -62,6 +73,7 @@ def htmx_trigger(name: str, payload: dict = None):
def htmx_refresh():
"""Create a response that triggers a client-side page refresh via HTMX."""
resp = HttpResponse("")
resp["HX-Refresh"] = "true"
return resp

View File

@@ -1,3 +1,6 @@
"""
Middleware for handling errors in HTMX requests.
"""
import logging
from django.http import HttpResponseServerError
from django.template.loader import render_to_string
@@ -14,9 +17,15 @@ class HTMXErrorMiddleware:
def __call__(self, request):
try:
return self.get_response(request)
except Exception as exc:
except Exception:
logger.exception("Error during request")
if request.headers.get("HX-Request") == "true":
html = render_to_string("htmx/components/error_message.html", {"title": "Server error", "message": "An unexpected error occurred."})
html = render_to_string(
"htmx/components/error_message.html",
{
"title": "Server error",
"message": "An unexpected error occurred.",
},
)
return HttpResponseServerError(html)
raise

View File

@@ -10,7 +10,7 @@ class Migration(migrations.Migration):
dependencies = [
("contenttypes", "0002_remove_content_type_name"),
("core", "0002_historicalslug_pageview"),
("pghistory", "0007_auto_20250421_0444"),
("pghistory", "0006_delete_aggregateevent"),
]
operations = [

View File

@@ -1,37 +1,43 @@
from typing import Optional
from django.views.generic.list import MultipleObjectMixin
from django.views.generic.edit import FormMixin
"""HTMX mixins for views. Canonical definitions for partial rendering and triggers."""
from typing import Any, Optional, Type
from django.template import TemplateDoesNotExist
from django.template.loader import select_template
"""HTMX mixins for views. Single canonical definitions for partial rendering and triggers."""
from django.views.generic.edit import FormMixin
from django.views.generic.list import MultipleObjectMixin
class HTMXFilterableMixin(MultipleObjectMixin):
"""Enhance list views to return partial templates for HTMX requests."""
filter_class = None
filter_class: Optional[Type[Any]] = None
htmx_partial_suffix = "_partial.html"
def get_queryset(self):
"""Apply the filter class to the queryset if defined."""
qs = super().get_queryset()
if self.filter_class:
self.filterset = self.filter_class(self.request.GET, queryset=qs)
filter_cls = self.filter_class
if filter_cls:
# pylint: disable=not-callable
self.filterset = filter_cls(self.request.GET, queryset=qs)
return self.filterset.qs
return qs
def get_template_names(self):
"""Return partial template if HTMX request, otherwise default templates."""
names = super().get_template_names()
if self.request.headers.get("HX-Request") == "true":
partials = [t.replace(".html", self.htmx_partial_suffix) for t in names]
try:
select_template(partials)
return partials
except Exception:
except TemplateDoesNotExist:
return names
return names
def get_context_data(self, **kwargs):
"""Add the filterset to the context."""
ctx = super().get_context_data(**kwargs)
if hasattr(self, "filterset"):
ctx["filter"] = self.filterset
@@ -44,11 +50,13 @@ class HTMXFormMixin(FormMixin):
htmx_success_trigger: Optional[str] = None
def form_invalid(self, form):
"""Return partial with errors on invalid form submission via HTMX."""
if self.request.headers.get("HX-Request") == "true":
return self.render_to_response(self.get_context_data(form=form))
return super().form_invalid(form)
def form_valid(self, form):
"""Add HX-Trigger header on successful form submission via HTMX."""
res = super().form_valid(form)
if (
self.request.headers.get("HX-Request") == "true"
@@ -59,18 +67,24 @@ class HTMXFormMixin(FormMixin):
class HTMXInlineEditMixin(FormMixin):
"""Support simple inline edit flows: GET returns form partial, POST returns updated fragment."""
"""
Support simple inline edit flows.
def dispatch(self, request, *args, **kwargs):
return super().dispatch(request, *args, **kwargs)
GET returns form partial, POST returns updated fragment.
"""
class HTMXPaginationMixin:
"""Pagination helper that supports hx-trigger based infinite scroll or standard pagination."""
"""
Pagination helper.
Supports hx-trigger based infinite scroll or standard pagination.
"""
page_size = 20
def get_paginate_by(self, queryset):
def get_paginate_by(self, _queryset):
"""Return the number of items to paginate by."""
return getattr(self, "paginate_by", self.page_size)
@@ -80,6 +94,7 @@ class HTMXModalMixin(HTMXFormMixin):
modal_close_trigger = "modal:close"
def form_valid(self, form):
"""Send close trigger on successful form submission via HTMX."""
res = super().form_valid(form)
if self.request.headers.get("HX-Request") == "true":
res["HX-Trigger"] = self.modal_close_trigger

View File

@@ -0,0 +1,423 @@
# State Machine Metadata Specification
## Overview
This document defines the metadata specification for RichChoice objects when used in state machine contexts. The metadata drives all state machine behavior including valid transitions, permissions, and state properties.
## Metadata Structure
Metadata is stored in the `metadata` dictionary field of a RichChoice object:
```python
RichChoice(
value="state_value",
label="State Label",
metadata={
# Metadata fields go here
}
)
```
## Required Fields
### `can_transition_to`
**Type**: `List[str]`
**Required**: Yes
**Description**: List of valid target state values this state can transition to.
**Example**:
```python
metadata={
"can_transition_to": ["approved", "rejected", "escalated"]
}
```
**Validation Rules**:
- Must be present in every state's metadata (use empty list `[]` for terminal states)
- All referenced state values must exist in the same choice group
- Terminal states (marked with `is_final: True`) should have empty list
**Common Patterns**:
```python
# Initial state with multiple transitions
metadata={"can_transition_to": ["in_review", "rejected"]}
# Intermediate state
metadata={"can_transition_to": ["approved", "needs_revision"]}
# Terminal state
metadata={"can_transition_to": [], "is_final": True}
```
## Optional Fields
### `is_final`
**Type**: `bool`
**Default**: `False`
**Description**: Marks a state as terminal/final with no outgoing transitions.
**Example**:
```python
metadata={
"is_final": True,
"can_transition_to": []
}
```
**Validation Rules**:
- If `is_final: True`, `can_transition_to` must be empty
- Terminal states cannot have outgoing transitions
### `is_actionable`
**Type**: `bool`
**Default**: `False`
**Description**: Indicates whether actions can be taken in this state.
**Example**:
```python
metadata={
"is_actionable": True,
"can_transition_to": ["approved", "rejected"]
}
```
**Use Cases**:
- Marking states where user input is required
- Identifying states in moderation queues
- Filtering for states needing attention
### `requires_moderator`
**Type**: `bool`
**Default**: `False`
**Description**: Transition to/from this state requires moderator permissions.
**Example**:
```python
metadata={
"requires_moderator": True,
"can_transition_to": ["approved"]
}
```
**Permission Check**:
- User must have `is_staff=True`, OR
- User must have `moderation.can_moderate` permission, OR
- User must be in "moderators", "admins", or "staff" group
### `requires_admin_approval`
**Type**: `bool`
**Default**: `False`
**Description**: Transition requires admin-level permissions.
**Example**:
```python
metadata={
"requires_admin_approval": True,
"can_transition_to": ["published"]
}
```
**Permission Check**:
- User must have `is_superuser=True`, OR
- User must have `moderation.can_admin` permission, OR
- User must be in "admins" group
**Note**: Admin approval implies moderator permission. Don't set both flags.
## Extended Metadata Fields
### `transition_callbacks`
**Type**: `Dict[str, str]`
**Optional**: Yes
**Description**: Callback function names to execute during transitions.
**Example**:
```python
metadata={
"transition_callbacks": {
"on_enter": "handle_approval",
"on_exit": "cleanup_pending",
}
}
```
### `estimated_duration`
**Type**: `int` (seconds)
**Optional**: Yes
**Description**: Expected duration for remaining in this state.
**Example**:
```python
metadata={
"estimated_duration": 86400, # 24 hours
"can_transition_to": ["approved"]
}
```
### `notification_triggers`
**Type**: `List[str]`
**Optional**: Yes
**Description**: Notification types to trigger on entering this state.
**Example**:
```python
metadata={
"notification_triggers": ["moderator_assigned", "user_notified"],
"can_transition_to": ["approved"]
}
```
## Complete Examples
### Example 1: Basic Moderation Workflow
```python
from backend.apps.core.choices.base import RichChoice
moderation_states = [
# Initial state
RichChoice(
value="pending",
label="Pending Review",
description="Awaiting moderator assignment",
metadata={
"can_transition_to": ["in_review", "rejected"],
"is_actionable": True,
}
),
# Processing state
RichChoice(
value="in_review",
label="Under Review",
description="Being reviewed by moderator",
metadata={
"can_transition_to": ["approved", "rejected", "escalated"],
"requires_moderator": True,
"is_actionable": True,
}
),
# Escalation state
RichChoice(
value="escalated",
label="Escalated to Admin",
description="Requires admin decision",
metadata={
"can_transition_to": ["approved", "rejected"],
"requires_admin_approval": True,
"is_actionable": True,
}
),
# Terminal states
RichChoice(
value="approved",
label="Approved",
description="Approved and published",
metadata={
"can_transition_to": [],
"is_final": True,
"requires_moderator": True,
}
),
RichChoice(
value="rejected",
label="Rejected",
description="Rejected and archived",
metadata={
"can_transition_to": [],
"is_final": True,
"requires_moderator": True,
}
),
]
```
### Example 2: Content Publishing Pipeline
```python
publishing_states = [
RichChoice(
value="draft",
label="Draft",
metadata={
"can_transition_to": ["submitted", "archived"],
"is_actionable": True,
}
),
RichChoice(
value="submitted",
label="Submitted for Review",
metadata={
"can_transition_to": ["draft", "approved", "rejected"],
"requires_moderator": True,
}
),
RichChoice(
value="approved",
label="Approved",
metadata={
"can_transition_to": ["published", "draft"],
"requires_moderator": True,
}
),
RichChoice(
value="published",
label="Published",
metadata={
"can_transition_to": ["archived"],
"requires_admin_approval": True,
}
),
RichChoice(
value="archived",
label="Archived",
metadata={
"can_transition_to": [],
"is_final": True,
}
),
RichChoice(
value="rejected",
label="Rejected",
metadata={
"can_transition_to": ["draft"],
"requires_moderator": True,
}
),
]
```
## Validation Rules
### Rule 1: Transition Reference Validity
All states in `can_transition_to` must exist in the same choice group.
**Invalid**:
```python
RichChoice("pending", "Pending", metadata={
"can_transition_to": ["nonexistent_state"] # ❌ State doesn't exist
})
```
### Rule 2: Terminal State Consistency
States marked `is_final: True` must have empty `can_transition_to`.
**Invalid**:
```python
RichChoice("approved", "Approved", metadata={
"is_final": True,
"can_transition_to": ["published"] # ❌ Final state has transitions
})
```
### Rule 3: Permission Hierarchy
`requires_admin_approval: True` implies moderator permissions.
**Redundant** (but not invalid):
```python
metadata={
"requires_admin_approval": True,
"requires_moderator": True, # ⚠️ Redundant
}
```
**Correct**:
```python
metadata={
"requires_admin_approval": True, # ✅ Admin implies moderator
}
```
### Rule 4: Cycle Detection
State machines should generally avoid cycles (except for revision flows).
**Warning** (may be valid for revision workflows):
```python
# State A -> State B -> State A creates a cycle
RichChoice("draft", "Draft", metadata={"can_transition_to": ["review"]}),
RichChoice("review", "Review", metadata={"can_transition_to": ["draft"]}),
```
### Rule 5: Reachability
All states should be reachable from initial states.
**Invalid**:
```python
# "orphan" state is unreachable
RichChoice("pending", "Pending", metadata={"can_transition_to": ["approved"]}),
RichChoice("approved", "Approved", metadata={"is_final": True}),
RichChoice("orphan", "Orphan", metadata={"can_transition_to": []}), # ❌
```
## Testing Metadata
Use `MetadataValidator` to test your metadata:
```python
from backend.apps.core.state_machine import MetadataValidator
validator = MetadataValidator("your_choice_group", "your_domain")
result = validator.validate_choice_group()
if not result.is_valid:
print(validator.generate_validation_report())
```
## Anti-Patterns
### ❌ Missing Transitions
```python
# Don't leave can_transition_to undefined
RichChoice("pending", "Pending", metadata={}) # Missing!
```
### ❌ Overly Complex Graphs
```python
# Avoid states with too many outgoing transitions
metadata={
"can_transition_to": [
"state1", "state2", "state3", "state4",
"state5", "state6", "state7", "state8"
] # Too many options!
}
```
### ❌ Inconsistent Permission Requirements
```python
# Don't require admin without requiring moderator first
metadata={
"requires_admin_approval": True,
"requires_moderator": False, # Inconsistent!
}
```
## Best Practices
1. ✅ Always define `can_transition_to` (use `[]` for terminal states)
2. ✅ Use `is_final: True` for all terminal states
3. ✅ Mark actionable states with `is_actionable: True`
4. ✅ Apply permission flags at the appropriate level
5. ✅ Keep state graphs simple and linear when possible
6. ✅ Document complex transition logic in descriptions
7. ✅ Run validation during development
8. ✅ Test all transition paths
## Version History
- **v1.0** (2025-12-20): Initial specification

View File

@@ -0,0 +1,320 @@
# State Machine System Documentation
## Overview
The state machine system provides a comprehensive integration between Django's RichChoice system and django-fsm (Finite State Machine). This integration automatically generates state transition methods based on metadata defined in RichChoice objects, eliminating the need for manual state management code.
## Key Features
- **Metadata-Driven**: All state machine behavior is derived from RichChoice metadata
- **Automatic Transition Generation**: Transition methods are automatically created from metadata
- **Permission-Based Guards**: Built-in support for moderator and admin permissions
- **Validation**: Comprehensive validation ensures metadata consistency
- **Centralized Registry**: All transitions are tracked in a central registry
- **Logging Integration**: Automatic integration with django-fsm-log
## Quick Start
### 1. Define Your States with Metadata
```python
from backend.apps.core.choices.base import RichChoice, ChoiceCategory
from backend.apps.core.choices.registry import registry
submission_states = [
RichChoice(
value="pending",
label="Pending Review",
description="Awaiting moderator review",
metadata={
"can_transition_to": ["approved", "rejected", "escalated"],
"requires_moderator": False,
"is_actionable": True,
},
category=ChoiceCategory.STATUS,
),
RichChoice(
value="approved",
label="Approved",
description="Approved by moderator",
metadata={
"can_transition_to": [],
"is_final": True,
"requires_moderator": True,
},
category=ChoiceCategory.STATUS,
),
RichChoice(
value="rejected",
label="Rejected",
description="Rejected by moderator",
metadata={
"can_transition_to": [],
"is_final": True,
"requires_moderator": True,
},
category=ChoiceCategory.STATUS,
),
]
registry.register("submission_status", submission_states, domain="moderation")
```
### 2. Use RichFSMField in Your Model
```python
from django.db import models
from backend.apps.core.state_machine import RichFSMField, StateMachineMixin
class EditSubmission(StateMachineMixin, models.Model):
status = RichFSMField(
choice_group="submission_status",
domain="moderation",
default="pending",
)
# ... other fields
```
### 3. Apply State Machine
```python
from backend.apps.core.state_machine import apply_state_machine
# Apply state machine (usually in AppConfig.ready())
apply_state_machine(
EditSubmission,
field_name="status",
choice_group="submission_status",
domain="moderation"
)
```
### 4. Use Transition Methods
```python
# Get an instance
submission = EditSubmission.objects.get(id=1)
# Check available transitions
available = submission.get_available_state_transitions()
print(f"Can transition to: {[t.target for t in available]}")
# Execute transition
if submission.can_transition_to("approved", user=request.user):
submission.approve(user=request.user, comment="Looks good!")
submission.save()
```
## Metadata Reference
### Required Metadata Fields
- **`can_transition_to`** (list): List of valid target states from this state
```python
metadata={"can_transition_to": ["approved", "rejected"]}
```
### Optional Metadata Fields
- **`is_final`** (bool): Whether this is a terminal state (no outgoing transitions)
```python
metadata={"is_final": True}
```
- **`is_actionable`** (bool): Whether actions can be taken in this state
```python
metadata={"is_actionable": True}
```
- **`requires_moderator`** (bool): Whether moderator permission is required
```python
metadata={"requires_moderator": True}
```
- **`requires_admin_approval`** (bool): Whether admin permission is required
```python
metadata={"requires_admin_approval": True}
```
## Components
### StateTransitionBuilder
Reads RichChoice metadata and generates FSM transition configurations.
```python
from backend.apps.core.state_machine import StateTransitionBuilder
builder = StateTransitionBuilder("submission_status", "moderation")
graph = builder.build_transition_graph()
# Returns: {"pending": ["approved", "rejected"], "approved": [], ...}
```
### TransitionRegistry
Centralized registry for managing and looking up FSM transitions.
```python
from backend.apps.core.state_machine import registry_instance
# Get available transitions
transitions = registry_instance.get_available_transitions(
"submission_status", "moderation", "pending"
)
# Export graph for visualization
mermaid = registry_instance.export_transition_graph(
"submission_status", "moderation", format="mermaid"
)
```
### MetadataValidator
Validates RichChoice metadata meets state machine requirements.
```python
from backend.apps.core.state_machine import MetadataValidator
validator = MetadataValidator("submission_status", "moderation")
result = validator.validate_choice_group()
if not result.is_valid:
for error in result.errors:
print(error)
```
### PermissionGuard
Guards for checking permissions on state transitions.
```python
from backend.apps.core.state_machine import PermissionGuard
guard = PermissionGuard(requires_moderator=True)
allowed = guard(instance, user=request.user)
```
## Common Patterns
### Pattern 1: Basic Approval Flow
```python
states = [
RichChoice("pending", "Pending", metadata={
"can_transition_to": ["approved", "rejected"]
}),
RichChoice("approved", "Approved", metadata={
"is_final": True,
"requires_moderator": True,
}),
RichChoice("rejected", "Rejected", metadata={
"is_final": True,
"requires_moderator": True,
}),
]
```
### Pattern 2: Multi-Level Approval
```python
states = [
RichChoice("pending", "Pending", metadata={
"can_transition_to": ["moderator_review"]
}),
RichChoice("moderator_review", "Under Review", metadata={
"can_transition_to": ["admin_review", "rejected"],
"requires_moderator": True,
}),
RichChoice("admin_review", "Admin Review", metadata={
"can_transition_to": ["approved", "rejected"],
"requires_admin_approval": True,
}),
RichChoice("approved", "Approved", metadata={"is_final": True}),
RichChoice("rejected", "Rejected", metadata={"is_final": True}),
]
```
### Pattern 3: With Escalation
```python
states = [
RichChoice("pending", "Pending", metadata={
"can_transition_to": ["approved", "rejected", "escalated"]
}),
RichChoice("escalated", "Escalated", metadata={
"can_transition_to": ["approved", "rejected"],
"requires_admin_approval": True,
}),
# ... final states
]
```
## Best Practices
1. **Always define `can_transition_to`**: Every state should explicitly list its valid transitions
2. **Mark terminal states**: Use `is_final: True` for states with no outgoing transitions
3. **Use permission flags**: Leverage `requires_moderator` and `requires_admin_approval` for access control
4. **Validate early**: Run validation during development to catch metadata issues
5. **Document transitions**: Use clear labels and descriptions for each state
6. **Test transitions**: Write tests for all transition paths
## Troubleshooting
### Issue: "Validation failed" error
**Cause**: Metadata references non-existent states or has inconsistencies
**Solution**: Run validation report to see specific errors:
```python
validator = MetadataValidator("your_group", "your_domain")
print(validator.generate_validation_report())
```
### Issue: Transition method not found
**Cause**: State machine not applied to model
**Solution**: Ensure `apply_state_machine()` is called in AppConfig.ready():
```python
from django.apps import AppConfig
class ModerationConfig(AppConfig):
def ready(self):
from backend.apps.core.state_machine import apply_state_machine
from .models import EditSubmission
apply_state_machine(
EditSubmission, "status", "submission_status", "moderation"
)
```
### Issue: Permission denied on transition
**Cause**: User doesn't have required permissions
**Solution**: Check permission requirements in metadata and ensure user has appropriate role/permissions
## API Reference
See individual component documentation:
- [StateTransitionBuilder](builder.py)
- [TransitionRegistry](registry.py)
- [MetadataValidator](validators.py)
- [PermissionGuard](guards.py)
- [Integration Utilities](integration.py)
## Testing
The system includes comprehensive tests:
```bash
pytest backend/apps/core/state_machine/tests/
```
Test coverage includes:
- Builder functionality
- Decorator generation
- Registry operations
- Metadata validation
- Guard functionality
- Model integration

View File

@@ -0,0 +1,124 @@
"""State machine utilities for core app."""
from .fields import RichFSMField
from .mixins import StateMachineMixin
from .builder import (
StateTransitionBuilder,
determine_method_name_for_transition,
)
from .decorators import (
generate_transition_decorator,
TransitionMethodFactory,
)
from .registry import TransitionRegistry, TransitionInfo, registry_instance
from .validators import MetadataValidator, ValidationResult
from .guards import (
# Role constants
VALID_ROLES,
MODERATOR_ROLES,
ADMIN_ROLES,
SUPERUSER_ROLES,
ESCALATION_LEVEL_ROLES,
# Guard classes
PermissionGuard,
OwnershipGuard,
AssignmentGuard,
StateGuard,
MetadataGuard,
CompositeGuard,
# Guard extraction and creation
extract_guards_from_metadata,
create_permission_guard,
create_ownership_guard,
create_assignment_guard,
create_composite_guard,
validate_guard_metadata,
# Registry
GuardRegistry,
guard_registry,
# Role checking functions
get_user_role,
has_role,
is_moderator_or_above,
is_admin_or_above,
is_superuser_role,
has_permission,
)
from .exceptions import (
TransitionPermissionDenied,
TransitionValidationError,
TransitionNotAvailable,
ERROR_MESSAGES,
get_permission_error_message,
get_state_error_message,
format_transition_error,
raise_permission_denied,
raise_validation_error,
)
from .integration import (
apply_state_machine,
StateMachineModelMixin,
state_machine_model,
)
__all__ = [
# Fields and mixins
"RichFSMField",
"StateMachineMixin",
# Builder
"StateTransitionBuilder",
"determine_method_name_for_transition",
# Decorators
"generate_transition_decorator",
"TransitionMethodFactory",
# Registry
"TransitionRegistry",
"TransitionInfo",
"registry_instance",
# Validators
"MetadataValidator",
"ValidationResult",
# Role constants
"VALID_ROLES",
"MODERATOR_ROLES",
"ADMIN_ROLES",
"SUPERUSER_ROLES",
"ESCALATION_LEVEL_ROLES",
# Guard classes
"PermissionGuard",
"OwnershipGuard",
"AssignmentGuard",
"StateGuard",
"MetadataGuard",
"CompositeGuard",
# Guard extraction and creation
"extract_guards_from_metadata",
"create_permission_guard",
"create_ownership_guard",
"create_assignment_guard",
"create_composite_guard",
"validate_guard_metadata",
# Guard registry
"GuardRegistry",
"guard_registry",
# Role checking functions
"get_user_role",
"has_role",
"is_moderator_or_above",
"is_admin_or_above",
"is_superuser_role",
"has_permission",
# Exceptions
"TransitionPermissionDenied",
"TransitionValidationError",
"TransitionNotAvailable",
"ERROR_MESSAGES",
"get_permission_error_message",
"get_state_error_message",
"format_transition_error",
"raise_permission_denied",
"raise_validation_error",
# Integration
"apply_state_machine",
"StateMachineModelMixin",
"state_machine_model",
]

View File

@@ -0,0 +1,194 @@
"""StateTransitionBuilder - Reads RichChoice metadata and generates FSM configurations."""
from typing import Dict, List, Optional, Any
from django.core.exceptions import ImproperlyConfigured
from apps.core.choices.registry import registry
from apps.core.choices.base import RichChoice
class StateTransitionBuilder:
"""Reads RichChoice metadata and generates FSM transition configurations."""
def __init__(self, choice_group: str, domain: str = "core"):
"""
Initialize builder with a specific choice group.
Args:
choice_group: Name of the choice group in the registry
domain: Domain namespace for the choice group
Raises:
ImproperlyConfigured: If choice group doesn't exist
"""
self.choice_group = choice_group
self.domain = domain
self._cache: Dict[str, Any] = {}
# Validate choice group exists
group = registry.get(choice_group, domain)
if group is None:
raise ImproperlyConfigured(
f"Choice group '{choice_group}' not found in domain '{domain}'"
)
self.choices = registry.get_choices(choice_group, domain)
def get_choice_metadata(self, state_value: str) -> Dict[str, Any]:
"""
Retrieve metadata for a specific state.
Args:
state_value: The state value to get metadata for
Returns:
Dictionary containing the state's metadata
"""
cache_key = f"metadata_{state_value}"
if cache_key in self._cache:
return self._cache[cache_key]
choice = registry.get_choice(self.choice_group, state_value, self.domain)
if choice is None:
return {}
metadata = choice.metadata.copy()
self._cache[cache_key] = metadata
return metadata
def extract_valid_transitions(self, state_value: str) -> List[str]:
"""
Get can_transition_to list from metadata.
Args:
state_value: The source state value
Returns:
List of valid target states
"""
metadata = self.get_choice_metadata(state_value)
transitions = metadata.get("can_transition_to", [])
# Validate all target states exist
for target in transitions:
target_choice = registry.get_choice(
self.choice_group, target, self.domain
)
if target_choice is None:
raise ImproperlyConfigured(
f"State '{state_value}' references non-existent "
f"transition target '{target}'"
)
return transitions
def extract_permission_requirements(
self, state_value: str
) -> Dict[str, bool]:
"""
Extract permission requirements from metadata.
Args:
state_value: The state value to extract permissions for
Returns:
Dictionary with permission requirement flags
"""
metadata = self.get_choice_metadata(state_value)
return {
"requires_moderator": metadata.get("requires_moderator", False),
"requires_admin_approval": metadata.get(
"requires_admin_approval", False
),
}
def is_terminal_state(self, state_value: str) -> bool:
"""
Check if state is terminal (is_final flag).
Args:
state_value: The state value to check
Returns:
True if state is terminal/final
"""
metadata = self.get_choice_metadata(state_value)
return metadata.get("is_final", False)
def is_actionable_state(self, state_value: str) -> bool:
"""
Check if state is actionable (is_actionable flag).
Args:
state_value: The state value to check
Returns:
True if state is actionable
"""
metadata = self.get_choice_metadata(state_value)
return metadata.get("is_actionable", False)
def build_transition_graph(self) -> Dict[str, List[str]]:
"""
Create a complete state transition graph.
Returns:
Dictionary mapping each state to its valid target states
"""
cache_key = "transition_graph"
if cache_key in self._cache:
return self._cache[cache_key]
graph = {}
for choice in self.choices:
transitions = self.extract_valid_transitions(choice.value)
graph[choice.value] = transitions
self._cache[cache_key] = graph
return graph
def get_all_states(self) -> List[str]:
"""
Get all state values in the choice group.
Returns:
List of all state values
"""
return [choice.value for choice in self.choices]
def get_choice(self, state_value: str) -> Optional[RichChoice]:
"""
Get the RichChoice object for a state.
Args:
state_value: The state value to get
Returns:
RichChoice object or None if not found
"""
return registry.get_choice(self.choice_group, state_value, self.domain)
def clear_cache(self) -> None:
"""Clear the internal cache."""
self._cache.clear()
def determine_method_name_for_transition(source: str, target: str) -> str:
"""
Determine appropriate method name for a transition.
Always uses transition_to_<state> pattern to avoid conflicts with
business logic methods (approve, reject, escalate, etc.).
Args:
source: Source state
target: Target state
Returns:
Method name in format "transition_to_{target_lower}"
"""
# Always use transition_to_<state> pattern to avoid conflicts
# with business logic methods
return f"transition_to_{target.lower()}"
__all__ = ["StateTransitionBuilder", "determine_method_name_for_transition"]

View File

@@ -0,0 +1,506 @@
"""
Callback system infrastructure for FSM state transitions.
This module provides the core classes and registry for managing callbacks
that execute during state machine transitions.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union
import logging
from django.db import models
logger = logging.getLogger(__name__)
class CallbackStage(Enum):
"""Stages at which callbacks can be executed during a transition."""
PRE = "pre"
POST = "post"
ERROR = "error"
@dataclass
class TransitionContext:
"""
Context object passed to callbacks containing transition metadata.
Provides all relevant information about the transition being executed.
"""
instance: models.Model
field_name: str
source_state: str
target_state: str
user: Optional[Any] = None
timestamp: datetime = field(default_factory=datetime.now)
extra_data: Dict[str, Any] = field(default_factory=dict)
@property
def model_class(self) -> Type[models.Model]:
"""Get the model class of the instance."""
return type(self.instance)
@property
def model_name(self) -> str:
"""Get the model class name."""
return self.model_class.__name__
def __str__(self) -> str:
return (
f"TransitionContext({self.model_name}.{self.field_name}: "
f"{self.source_state}{self.target_state})"
)
class BaseTransitionCallback(ABC):
"""
Abstract base class for all transition callbacks.
Subclasses must implement the execute method to define callback behavior.
"""
# Priority determines execution order (lower = earlier)
priority: int = 100
# Whether to continue execution if this callback fails
continue_on_error: bool = True
# Human-readable name for logging/debugging
name: str = "BaseCallback"
def __init__(
self,
priority: Optional[int] = None,
continue_on_error: Optional[bool] = None,
name: Optional[str] = None,
):
if priority is not None:
self.priority = priority
if continue_on_error is not None:
self.continue_on_error = continue_on_error
if name is not None:
self.name = name
@abstractmethod
def execute(self, context: TransitionContext) -> bool:
"""
Execute the callback.
Args:
context: TransitionContext containing all transition information.
Returns:
True if successful, False otherwise.
"""
pass
def should_execute(self, context: TransitionContext) -> bool:
"""
Determine if this callback should execute for the given context.
Override this method to add conditional execution logic.
Args:
context: TransitionContext containing all transition information.
Returns:
True if the callback should execute, False to skip.
"""
return True
def __repr__(self) -> str:
return f"{self.__class__.__name__}(name={self.name}, priority={self.priority})"
class PreTransitionCallback(BaseTransitionCallback):
"""
Callback executed before the state transition occurs.
Can be used to validate preconditions or prepare resources.
If execute() returns False, the transition will be aborted.
"""
name: str = "PreTransitionCallback"
# By default, pre-transition callbacks abort on error
continue_on_error: bool = False
class PostTransitionCallback(BaseTransitionCallback):
"""
Callback executed after a successful state transition.
Used for side effects like notifications, cache invalidation,
and updating related models.
"""
name: str = "PostTransitionCallback"
# By default, post-transition callbacks continue on error
continue_on_error: bool = True
class ErrorTransitionCallback(BaseTransitionCallback):
"""
Callback executed when a transition fails.
Used for cleanup, logging, or error notifications.
"""
name: str = "ErrorTransitionCallback"
# Error callbacks should always continue
continue_on_error: bool = True
def execute(self, context: TransitionContext, exception: Optional[Exception] = None) -> bool:
"""
Execute the error callback.
Args:
context: TransitionContext containing all transition information.
exception: The exception that caused the transition to fail.
Returns:
True if successful, False otherwise.
"""
pass
@dataclass
class CallbackRegistration:
"""Represents a registered callback with its configuration."""
callback: BaseTransitionCallback
model_class: Type[models.Model]
field_name: str
source: str # Can be '*' for wildcard
target: str # Can be '*' for wildcard
stage: CallbackStage
def matches(
self,
model_class: Type[models.Model],
field_name: str,
source: str,
target: str,
) -> bool:
"""Check if this registration matches the given transition."""
if self.model_class != model_class:
return False
if self.field_name != field_name:
return False
if self.source != '*' and self.source != source:
return False
if self.target != '*' and self.target != target:
return False
return True
class TransitionCallbackRegistry:
"""
Singleton registry for managing transition callbacks.
Provides methods to register callbacks and retrieve/execute them
for specific transitions.
"""
_instance: Optional['TransitionCallbackRegistry'] = None
_initialized: bool = False
def __new__(cls) -> 'TransitionCallbackRegistry':
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if self._initialized:
return
self._callbacks: Dict[CallbackStage, List[CallbackRegistration]] = {
CallbackStage.PRE: [],
CallbackStage.POST: [],
CallbackStage.ERROR: [],
}
self._initialized = True
def register(
self,
model_class: Type[models.Model],
field_name: str,
source: str,
target: str,
callback: BaseTransitionCallback,
stage: Union[CallbackStage, str] = CallbackStage.POST,
) -> None:
"""
Register a callback for a specific transition.
Args:
model_class: The model class the callback applies to.
field_name: The FSM field name.
source: Source state (use '*' for any source).
target: Target state (use '*' for any target).
callback: The callback instance to register.
stage: When to execute the callback (pre/post/error).
"""
if isinstance(stage, str):
stage = CallbackStage(stage)
registration = CallbackRegistration(
callback=callback,
model_class=model_class,
field_name=field_name,
source=source,
target=target,
stage=stage,
)
self._callbacks[stage].append(registration)
# Keep callbacks sorted by priority
self._callbacks[stage].sort(key=lambda r: r.callback.priority)
logger.debug(
f"Registered {stage.value} callback: {callback.name} for "
f"{model_class.__name__}.{field_name} ({source}{target})"
)
def register_bulk(
self,
model_class: Type[models.Model],
field_name: str,
callbacks_config: Dict[Tuple[str, str], List[BaseTransitionCallback]],
stage: Union[CallbackStage, str] = CallbackStage.POST,
) -> None:
"""
Register multiple callbacks for multiple transitions.
Args:
model_class: The model class the callbacks apply to.
field_name: The FSM field name.
callbacks_config: Dict mapping (source, target) tuples to callback lists.
stage: When to execute the callbacks.
"""
for (source, target), callbacks in callbacks_config.items():
for callback in callbacks:
self.register(model_class, field_name, source, target, callback, stage)
def get_callbacks(
self,
model_class: Type[models.Model],
field_name: str,
source: str,
target: str,
stage: Union[CallbackStage, str] = CallbackStage.POST,
) -> List[BaseTransitionCallback]:
"""
Get all callbacks matching the given transition.
Args:
model_class: The model class.
field_name: The FSM field name.
source: Source state.
target: Target state.
stage: The callback stage to retrieve.
Returns:
List of matching callbacks, sorted by priority.
"""
if isinstance(stage, str):
stage = CallbackStage(stage)
matching = []
for registration in self._callbacks[stage]:
if registration.matches(model_class, field_name, source, target):
matching.append(registration.callback)
return matching
def execute_callbacks(
self,
context: TransitionContext,
stage: Union[CallbackStage, str] = CallbackStage.POST,
exception: Optional[Exception] = None,
) -> Tuple[bool, List[Tuple[BaseTransitionCallback, Optional[Exception]]]]:
"""
Execute all callbacks for a transition.
Args:
context: The transition context.
stage: The callback stage to execute.
exception: Exception that occurred (for error callbacks).
Returns:
Tuple of (overall_success, list of (callback, exception) for failures).
"""
if isinstance(stage, str):
stage = CallbackStage(stage)
callbacks = self.get_callbacks(
context.model_class,
context.field_name,
context.source_state,
context.target_state,
stage,
)
failures: List[Tuple[BaseTransitionCallback, Optional[Exception]]] = []
overall_success = True
for callback in callbacks:
try:
# Check if callback should execute
if not callback.should_execute(context):
logger.debug(
f"Skipping callback {callback.name} - "
f"should_execute returned False"
)
continue
# Execute callback
logger.debug(f"Executing {stage.value} callback: {callback.name}")
if stage == CallbackStage.ERROR:
result = callback.execute(context, exception=exception)
else:
result = callback.execute(context)
if not result:
logger.warning(
f"Callback {callback.name} returned False for {context}"
)
failures.append((callback, None))
overall_success = False
if not callback.continue_on_error:
logger.error(
f"Aborting callback chain - {callback.name} failed "
f"and continue_on_error=False"
)
break
except Exception as e:
logger.exception(
f"Callback {callback.name} raised exception for {context}: {e}"
)
failures.append((callback, e))
overall_success = False
if not callback.continue_on_error:
logger.error(
f"Aborting callback chain - {callback.name} raised exception "
f"and continue_on_error=False"
)
break
return overall_success, failures
def clear(self, model_class: Optional[Type[models.Model]] = None) -> None:
"""
Clear registered callbacks.
Args:
model_class: If provided, only clear callbacks for this model.
If None, clear all callbacks.
"""
if model_class is None:
for stage in CallbackStage:
self._callbacks[stage] = []
else:
for stage in CallbackStage:
self._callbacks[stage] = [
r for r in self._callbacks[stage]
if r.model_class != model_class
]
def get_all_registrations(
self,
model_class: Optional[Type[models.Model]] = None,
) -> Dict[CallbackStage, List[CallbackRegistration]]:
"""
Get all registered callbacks, optionally filtered by model class.
Args:
model_class: If provided, only return callbacks for this model.
Returns:
Dict mapping stages to lists of registrations.
"""
if model_class is None:
return dict(self._callbacks)
filtered = {}
for stage, registrations in self._callbacks.items():
filtered[stage] = [
r for r in registrations
if r.model_class == model_class
]
return filtered
@classmethod
def reset_instance(cls) -> None:
"""Reset the singleton instance. Mainly for testing."""
cls._instance = None
cls._initialized = False
# Global registry instance
callback_registry = TransitionCallbackRegistry()
# Convenience functions for common operations
def register_callback(
model_class: Type[models.Model],
field_name: str,
source: str,
target: str,
callback: BaseTransitionCallback,
stage: Union[CallbackStage, str] = CallbackStage.POST,
) -> None:
"""Convenience function to register a callback."""
callback_registry.register(model_class, field_name, source, target, callback, stage)
def register_pre_callback(
model_class: Type[models.Model],
field_name: str,
source: str,
target: str,
callback: PreTransitionCallback,
) -> None:
"""Convenience function to register a pre-transition callback."""
callback_registry.register(
model_class, field_name, source, target, callback, CallbackStage.PRE
)
def register_post_callback(
model_class: Type[models.Model],
field_name: str,
source: str,
target: str,
callback: PostTransitionCallback,
) -> None:
"""Convenience function to register a post-transition callback."""
callback_registry.register(
model_class, field_name, source, target, callback, CallbackStage.POST
)
def register_error_callback(
model_class: Type[models.Model],
field_name: str,
source: str,
target: str,
callback: ErrorTransitionCallback,
) -> None:
"""Convenience function to register an error callback."""
callback_registry.register(
model_class, field_name, source, target, callback, CallbackStage.ERROR
)

View File

@@ -0,0 +1,50 @@
"""
FSM Transition Callbacks Package.
This package provides specialized callback implementations for
FSM state transitions.
"""
from .notifications import (
NotificationCallback,
SubmissionApprovedNotification,
SubmissionRejectedNotification,
SubmissionEscalatedNotification,
StatusChangeNotification,
ModerationNotificationCallback,
)
from .cache import (
CacheInvalidationCallback,
ModelCacheInvalidation,
RelatedModelCacheInvalidation,
PatternCacheInvalidation,
APICacheInvalidation,
)
from .related_updates import (
RelatedModelUpdateCallback,
ParkCountUpdateCallback,
SearchTextUpdateCallback,
ComputedFieldUpdateCallback,
)
__all__ = [
# Notification callbacks
"NotificationCallback",
"SubmissionApprovedNotification",
"SubmissionRejectedNotification",
"SubmissionEscalatedNotification",
"StatusChangeNotification",
"ModerationNotificationCallback",
# Cache callbacks
"CacheInvalidationCallback",
"ModelCacheInvalidation",
"RelatedModelCacheInvalidation",
"PatternCacheInvalidation",
"APICacheInvalidation",
# Related update callbacks
"RelatedModelUpdateCallback",
"ParkCountUpdateCallback",
"SearchTextUpdateCallback",
"ComputedFieldUpdateCallback",
]

View File

@@ -0,0 +1,498 @@
"""Transition decorator generation for django-fsm integration."""
from typing import Any, Callable, List, Optional, Type, Union
from functools import wraps
import logging
from django.db import models
from django_fsm import transition
from django_fsm_log.decorators import fsm_log_by
from .callbacks import (
BaseTransitionCallback,
CallbackStage,
TransitionContext,
callback_registry,
)
from .signals import (
pre_state_transition,
post_state_transition,
state_transition_failed,
)
logger = logging.getLogger(__name__)
def with_callbacks(
field_name: str = "status",
emit_signals: bool = True,
) -> Callable:
"""
Decorator that wraps FSM transition methods to execute callbacks.
This decorator should be applied BEFORE the @transition decorator:
Example:
@with_callbacks(field_name='status')
@fsm_log_by
@transition(field='status', source='PENDING', target='APPROVED')
def transition_to_approved(self, user=None, **kwargs):
pass
Args:
field_name: The name of the FSM field for this transition.
emit_signals: Whether to emit Django signals for the transition.
Returns:
Decorated function with callback execution.
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(instance, *args, **kwargs):
# Extract user from kwargs
user = kwargs.get('user')
# Get source state before transition
source_state = getattr(instance, field_name, None)
# Get target state from the transition decorator
# The @transition decorator sets _django_fsm_target
target_state = getattr(func, '_django_fsm', {}).get('target', None)
# If we can't determine the target from decorator metadata,
# we'll capture it after the transition
if target_state is None:
# This happens when decorators are applied in wrong order
logger.debug(
f"Could not determine target state from decorator for {func.__name__}"
)
# Create transition context
context = TransitionContext(
instance=instance,
field_name=field_name,
source_state=str(source_state) if source_state else '',
target_state=str(target_state) if target_state else '',
user=user,
extra_data=dict(kwargs),
)
# Execute pre-transition callbacks
pre_success, pre_failures = callback_registry.execute_callbacks(
context, CallbackStage.PRE
)
# If pre-callbacks fail with continue_on_error=False, abort
if not pre_success and pre_failures:
for callback, exc in pre_failures:
if not callback.continue_on_error:
logger.error(
f"Pre-transition callback {callback.name} failed, "
f"aborting transition"
)
if exc:
raise exc
raise RuntimeError(
f"Pre-transition callback {callback.name} failed"
)
# Emit pre-transition signal
if emit_signals:
pre_state_transition.send(
sender=type(instance),
instance=instance,
source=context.source_state,
target=context.target_state,
user=user,
context=context,
)
try:
# Execute the actual transition
result = func(instance, *args, **kwargs)
# Update context with actual target state after transition
actual_target = getattr(instance, field_name, None)
context.target_state = str(actual_target) if actual_target else ''
# Execute post-transition callbacks
post_success, post_failures = callback_registry.execute_callbacks(
context, CallbackStage.POST
)
if not post_success:
for callback, exc in post_failures:
logger.warning(
f"Post-transition callback {callback.name} failed "
f"for {context}"
)
# Emit post-transition signal
if emit_signals:
post_state_transition.send(
sender=type(instance),
instance=instance,
source=context.source_state,
target=context.target_state,
user=user,
context=context,
)
return result
except Exception as e:
# Execute error callbacks
error_success, error_failures = callback_registry.execute_callbacks(
context, CallbackStage.ERROR, exception=e
)
# Emit failure signal
if emit_signals:
state_transition_failed.send(
sender=type(instance),
instance=instance,
source=context.source_state,
target=context.target_state,
user=user,
exception=e,
context=context,
)
# Re-raise the original exception
raise
return wrapper
return decorator
def generate_transition_decorator(
source: str,
target: str,
field_name: str = "status",
**kwargs: Any,
) -> Callable:
"""
Generate a configured @transition decorator.
Args:
source: Source state value(s)
target: Target state value
field_name: Name of the FSM field
**kwargs: Additional arguments for @transition decorator
Returns:
Configured transition decorator
"""
return transition(field=field_name, source=source, target=target, **kwargs)
def create_transition_method(
method_name: str,
source: str,
target: str,
field_name: str,
permission_guard: Optional[Callable] = None,
on_success: Optional[Callable] = None,
on_error: Optional[Callable] = None,
callbacks: Optional[List[BaseTransitionCallback]] = None,
enable_callbacks: bool = True,
emit_signals: bool = True,
) -> Callable:
"""
Generate a complete transition method with decorator.
Args:
method_name: Name for the transition method
source: Source state value(s)
target: Target state value
field_name: Name of the FSM field
permission_guard: Optional guard function for permissions
on_success: Optional callback on successful transition
on_error: Optional callback on transition error
callbacks: Optional list of callback instances to register
enable_callbacks: Whether to wrap with callback execution
emit_signals: Whether to emit Django signals
Returns:
Configured transition method with logging via django-fsm-log
"""
conditions = []
if permission_guard:
conditions.append(permission_guard)
@fsm_log_by
@transition(
field=field_name,
source=source,
target=target,
conditions=conditions,
on_error=on_error,
)
def transition_method(instance, user=None, **kwargs):
"""Execute state transition."""
if on_success:
on_success(instance, user=user, **kwargs)
transition_method.__name__ = method_name
transition_method.__doc__ = (
f"Transition from {source} to {target} on field {field_name}"
)
# Apply callback wrapper if enabled
if enable_callbacks:
transition_method = with_callbacks(
field_name=field_name,
emit_signals=emit_signals,
)(transition_method)
# Store metadata for callback registration
transition_method._fsm_metadata = {
'source': source,
'target': target,
'field_name': field_name,
'callbacks': callbacks or [],
}
return transition_method
def register_method_callbacks(
model_class: Type[models.Model],
method: Callable,
) -> None:
"""
Register callbacks defined in a transition method's metadata.
This should be called during model initialization or app ready.
Args:
model_class: The model class containing the method.
method: The transition method with _fsm_metadata.
"""
metadata = getattr(method, '_fsm_metadata', None)
if not metadata or not metadata.get('callbacks'):
return
from .callbacks import CallbackStage, PostTransitionCallback, PreTransitionCallback
for callback in metadata['callbacks']:
# Determine stage from callback type
if isinstance(callback, PreTransitionCallback):
stage = CallbackStage.PRE
else:
stage = CallbackStage.POST
callback_registry.register(
model_class=model_class,
field_name=metadata['field_name'],
source=metadata['source'],
target=metadata['target'],
callback=callback,
stage=stage,
)
class TransitionMethodFactory:
"""Factory for creating standard transition methods."""
@staticmethod
def create_approve_method(
source: str,
target: str,
field_name: str = "status",
permission_guard: Optional[Callable] = None,
) -> Callable:
"""
Create an approval transition method.
Args:
source: Source state value(s)
target: Target state value
field_name: Name of the FSM field
permission_guard: Optional permission guard
Returns:
Approval transition method
"""
@fsm_log_by
@transition(
field=field_name,
source=source,
target=target,
conditions=[permission_guard] if permission_guard else [],
)
def approve(instance, user=None, comment: str = "", **kwargs):
"""Approve and transition to approved state."""
if hasattr(instance, "approved_by_id"):
instance.approved_by = user
if hasattr(instance, "approval_comment"):
instance.approval_comment = comment
if hasattr(instance, "approved_at"):
from django.utils import timezone
instance.approved_at = timezone.now()
return approve
@staticmethod
def create_reject_method(
source: str,
target: str,
field_name: str = "status",
permission_guard: Optional[Callable] = None,
) -> Callable:
"""
Create a rejection transition method.
Args:
source: Source state value(s)
target: Target state value
field_name: Name of the FSM field
permission_guard: Optional permission guard
Returns:
Rejection transition method
"""
@fsm_log_by
@transition(
field=field_name,
source=source,
target=target,
conditions=[permission_guard] if permission_guard else [],
)
def reject(instance, user=None, reason: str = "", **kwargs):
"""Reject and transition to rejected state."""
if hasattr(instance, "rejected_by_id"):
instance.rejected_by = user
if hasattr(instance, "rejection_reason"):
instance.rejection_reason = reason
if hasattr(instance, "rejected_at"):
from django.utils import timezone
instance.rejected_at = timezone.now()
return reject
@staticmethod
def create_escalate_method(
source: str,
target: str,
field_name: str = "status",
permission_guard: Optional[Callable] = None,
) -> Callable:
"""
Create an escalation transition method.
Args:
source: Source state value(s)
target: Target state value
field_name: Name of the FSM field
permission_guard: Optional permission guard
Returns:
Escalation transition method
"""
@fsm_log_by
@transition(
field=field_name,
source=source,
target=target,
conditions=[permission_guard] if permission_guard else [],
)
def escalate(instance, user=None, reason: str = "", **kwargs):
"""Escalate to higher authority."""
if hasattr(instance, "escalated_by_id"):
instance.escalated_by = user
if hasattr(instance, "escalation_reason"):
instance.escalation_reason = reason
if hasattr(instance, "escalated_at"):
from django.utils import timezone
instance.escalated_at = timezone.now()
return escalate
@staticmethod
def create_generic_transition_method(
method_name: str,
source: str,
target: str,
field_name: str = "status",
permission_guard: Optional[Callable] = None,
docstring: Optional[str] = None,
) -> Callable:
"""
Create a generic transition method.
Args:
method_name: Name for the method
source: Source state value(s)
target: Target state value
field_name: Name of the FSM field
permission_guard: Optional permission guard
docstring: Optional docstring for the method
Returns:
Generic transition method
"""
@fsm_log_by
@transition(
field=field_name,
source=source,
target=target,
conditions=[permission_guard] if permission_guard else [],
)
def generic_transition(instance, user=None, **kwargs):
"""Execute state transition."""
pass
generic_transition.__name__ = method_name
if docstring:
generic_transition.__doc__ = docstring
else:
generic_transition.__doc__ = (
f"Transition from {source} to {target}"
)
return generic_transition
def with_transition_logging(transition_method: Callable) -> Callable:
"""
Decorator to add django-fsm-log logging to a transition method.
Args:
transition_method: The transition method to wrap
Returns:
Wrapped method with logging
"""
@wraps(transition_method)
def wrapper(instance, *args, **kwargs):
try:
from django_fsm_log.decorators import fsm_log_by
logged_method = fsm_log_by(transition_method)
return logged_method(instance, *args, **kwargs)
except ImportError:
# django-fsm-log not available, execute without logging
return transition_method(instance, *args, **kwargs)
return wrapper
__all__ = [
"generate_transition_decorator",
"create_transition_method",
"register_method_callbacks",
"TransitionMethodFactory",
"with_callbacks",
"with_transition_logging",
]

View File

@@ -0,0 +1,496 @@
"""Custom exceptions for state machine transitions.
This module provides custom exception classes for handling state machine
transition failures with user-friendly error messages and error codes.
Example usage:
try:
instance.transition_to_approved(user=user)
except TransitionPermissionDenied as e:
return Response({
'error': e.user_message,
'code': e.error_code
}, status=403)
"""
from typing import Any, Optional, List, Dict
from django_fsm import TransitionNotAllowed
class TransitionPermissionDenied(TransitionNotAllowed):
"""
Exception raised when a transition is not allowed due to permission issues.
This exception provides additional context about why the transition failed,
including a user-friendly message and error code for programmatic handling.
Attributes:
error_code: Machine-readable error code for programmatic handling
user_message: Human-readable message to display to the user
required_roles: List of roles that would have allowed the transition
user_role: The user's current role
"""
# Standard error codes
ERROR_CODE_NO_USER = "NO_USER"
ERROR_CODE_NOT_AUTHENTICATED = "NOT_AUTHENTICATED"
ERROR_CODE_PERMISSION_DENIED_ROLE = "PERMISSION_DENIED_ROLE"
ERROR_CODE_PERMISSION_DENIED_OWNERSHIP = "PERMISSION_DENIED_OWNERSHIP"
ERROR_CODE_PERMISSION_DENIED_ASSIGNMENT = "PERMISSION_DENIED_ASSIGNMENT"
ERROR_CODE_PERMISSION_DENIED_CUSTOM = "PERMISSION_DENIED_CUSTOM"
def __init__(
self,
message: str = "Permission denied for this transition",
error_code: str = "PERMISSION_DENIED",
user_message: Optional[str] = None,
required_roles: Optional[List[str]] = None,
user_role: Optional[str] = None,
guard: Optional[Any] = None,
):
"""
Initialize permission denied exception.
Args:
message: Technical error message (for logging)
error_code: Machine-readable error code
user_message: Human-readable message for the user
required_roles: List of roles that would have allowed the transition
user_role: The user's current role
guard: The guard that failed (for detailed error messages)
"""
super().__init__(message)
self.error_code = error_code
self.user_message = user_message or message
self.required_roles = required_roles or []
self.user_role = user_role
self.guard = guard
def to_dict(self) -> Dict[str, Any]:
"""
Convert exception to dictionary for API responses.
Returns:
Dictionary with error details
"""
return {
"error": self.user_message,
"error_code": self.error_code,
"required_roles": self.required_roles,
"user_role": self.user_role,
}
class TransitionValidationError(TransitionNotAllowed):
"""
Exception raised when a transition fails validation.
This exception is raised when business logic conditions are not met,
such as missing required fields or invalid state.
Attributes:
error_code: Machine-readable error code for programmatic handling
user_message: Human-readable message to display to the user
field_name: Name of the field that failed validation (if applicable)
current_state: Current state of the object
"""
# Standard error codes
ERROR_CODE_INVALID_STATE = "INVALID_STATE_TRANSITION"
ERROR_CODE_BLOCKED_STATE = "BLOCKED_STATE"
ERROR_CODE_MISSING_FIELD = "MISSING_REQUIRED_FIELD"
ERROR_CODE_EMPTY_FIELD = "EMPTY_REQUIRED_FIELD"
ERROR_CODE_NO_ASSIGNMENT = "NO_ASSIGNMENT"
ERROR_CODE_VALIDATION_FAILED = "VALIDATION_FAILED"
def __init__(
self,
message: str = "Transition validation failed",
error_code: str = "VALIDATION_FAILED",
user_message: Optional[str] = None,
field_name: Optional[str] = None,
current_state: Optional[str] = None,
guard: Optional[Any] = None,
):
"""
Initialize validation error exception.
Args:
message: Technical error message (for logging)
error_code: Machine-readable error code
user_message: Human-readable message for the user
field_name: Name of the field that failed validation
current_state: Current state of the object
guard: The guard that failed (for detailed error messages)
"""
super().__init__(message)
self.error_code = error_code
self.user_message = user_message or message
self.field_name = field_name
self.current_state = current_state
self.guard = guard
def to_dict(self) -> Dict[str, Any]:
"""
Convert exception to dictionary for API responses.
Returns:
Dictionary with error details
"""
result = {
"error": self.user_message,
"error_code": self.error_code,
}
if self.field_name:
result["field"] = self.field_name
if self.current_state:
result["current_state"] = self.current_state
return result
class TransitionNotAvailable(TransitionNotAllowed):
"""
Exception raised when a transition is not available from the current state.
This exception provides context about why the transition isn't available,
including the current state and available transitions.
Attributes:
error_code: Machine-readable error code
user_message: Human-readable message for the user
current_state: Current state of the object
requested_transition: The transition that was requested
available_transitions: List of transitions that are available
"""
ERROR_CODE_TRANSITION_NOT_AVAILABLE = "TRANSITION_NOT_AVAILABLE"
def __init__(
self,
message: str = "This transition is not available",
error_code: str = "TRANSITION_NOT_AVAILABLE",
user_message: Optional[str] = None,
current_state: Optional[str] = None,
requested_transition: Optional[str] = None,
available_transitions: Optional[List[str]] = None,
):
"""
Initialize transition not available exception.
Args:
message: Technical error message (for logging)
error_code: Machine-readable error code
user_message: Human-readable message for the user
current_state: Current state of the object
requested_transition: Name of the requested transition
available_transitions: List of available transition names
"""
super().__init__(message)
self.error_code = error_code
self.user_message = user_message or message
self.current_state = current_state
self.requested_transition = requested_transition
self.available_transitions = available_transitions or []
def to_dict(self) -> Dict[str, Any]:
"""
Convert exception to dictionary for API responses.
Returns:
Dictionary with error details
"""
return {
"error": self.user_message,
"error_code": self.error_code,
"current_state": self.current_state,
"requested_transition": self.requested_transition,
"available_transitions": self.available_transitions,
}
# Error message templates for common scenarios
ERROR_MESSAGES = {
"PERMISSION_DENIED_ROLE": (
"You need {required_role} permissions to {action}. "
"Please contact an administrator if you believe this is an error."
),
"PERMISSION_DENIED_OWNERSHIP": (
"You must be the owner of this item to perform this action."
),
"PERMISSION_DENIED_ASSIGNMENT": (
"This item must be assigned to you before you can {action}. "
"Please assign it to yourself first."
),
"NO_ASSIGNMENT": (
"This item must be assigned before this action can be performed."
),
"INVALID_STATE_TRANSITION": (
"This action cannot be performed from the current state. "
"The item is currently '{current_state}' and cannot be modified."
),
"TRANSITION_NOT_AVAILABLE": (
"This {item_type} has already been {state} and cannot be modified."
),
"MISSING_REQUIRED_FIELD": (
"{field_name} is required to complete this action."
),
"EMPTY_REQUIRED_FIELD": (
"{field_name} must not be empty."
),
"ESCALATED_REQUIRES_ADMIN": (
"This submission has been escalated and requires admin review. "
"Only administrators can approve or reject escalated items."
),
}
def get_permission_error_message(
guard: Any,
action: str = "perform this action",
**kwargs: Any,
) -> str:
"""
Generate a user-friendly error message based on guard type.
Args:
guard: The guard that failed
action: Description of the action being attempted
**kwargs: Additional context for message formatting
Returns:
User-friendly error message
Example:
message = get_permission_error_message(
guard,
action="approve submissions"
)
# "You need moderator permissions to approve submissions..."
"""
from .guards import (
PermissionGuard,
OwnershipGuard,
AssignmentGuard,
MODERATOR_ROLES,
ADMIN_ROLES,
SUPERUSER_ROLES,
)
if hasattr(guard, "get_error_message"):
return guard.get_error_message()
if isinstance(guard, PermissionGuard):
required_roles = guard.get_required_roles()
if required_roles == SUPERUSER_ROLES:
required_role = "superuser"
elif required_roles == ADMIN_ROLES:
required_role = "admin"
elif required_roles == MODERATOR_ROLES:
required_role = "moderator"
else:
required_role = ", ".join(required_roles)
return ERROR_MESSAGES["PERMISSION_DENIED_ROLE"].format(
required_role=required_role,
action=action,
)
if isinstance(guard, OwnershipGuard):
return ERROR_MESSAGES["PERMISSION_DENIED_OWNERSHIP"]
if isinstance(guard, AssignmentGuard):
return ERROR_MESSAGES["PERMISSION_DENIED_ASSIGNMENT"].format(action=action)
return f"You don't have permission to {action}"
def get_state_error_message(
current_state: str,
item_type: str = "item",
**kwargs: Any,
) -> str:
"""
Generate a user-friendly error message for state-related errors.
Args:
current_state: Current state of the object
item_type: Type of item (e.g., "submission", "report")
**kwargs: Additional context for message formatting
Returns:
User-friendly error message
Example:
message = get_state_error_message(
current_state="COMPLETED",
item_type="submission"
)
# "This submission has already been COMPLETED and cannot be modified."
"""
# Map states to user-friendly descriptions
state_descriptions = {
"COMPLETED": "completed",
"CANCELLED": "cancelled",
"APPROVED": "approved",
"REJECTED": "rejected",
"RESOLVED": "resolved",
"DISMISSED": "dismissed",
"ESCALATED": "escalated for review",
}
state_desc = state_descriptions.get(current_state, current_state.lower())
return ERROR_MESSAGES["TRANSITION_NOT_AVAILABLE"].format(
item_type=item_type,
state=state_desc,
)
def format_transition_error(
exception: Exception,
include_details: bool = False,
) -> Dict[str, Any]:
"""
Format a transition exception for API response.
Args:
exception: The exception to format
include_details: Include detailed information (for debugging)
Returns:
Dictionary suitable for API response
Example:
try:
instance.transition_to_approved(user=user)
except TransitionNotAllowed as e:
return Response(
format_transition_error(e),
status=403
)
"""
# Handle our custom exceptions
if hasattr(exception, "to_dict"):
result = exception.to_dict()
if not include_details:
# Remove technical details
result.pop("user_role", None)
return result
# Handle standard TransitionNotAllowed
if isinstance(exception, TransitionNotAllowed):
return {
"error": str(exception) or "This transition is not allowed",
"error_code": "TRANSITION_NOT_ALLOWED",
}
# Handle other exceptions
return {
"error": str(exception) or "An error occurred",
"error_code": "UNKNOWN_ERROR",
}
def raise_permission_denied(
guard: Any,
user: Any = None,
action: str = "perform this action",
) -> None:
"""
Raise a TransitionPermissionDenied exception with proper context.
Args:
guard: The guard that failed
user: The user who attempted the transition
action: Description of the action being attempted
Raises:
TransitionPermissionDenied: Always raised with proper context
"""
from .guards import PermissionGuard, get_user_role
user_message = get_permission_error_message(guard, action=action)
user_role = get_user_role(user) if user else None
error_code = TransitionPermissionDenied.ERROR_CODE_PERMISSION_DENIED_ROLE
required_roles: List[str] = []
if isinstance(guard, PermissionGuard):
required_roles = guard.get_required_roles()
if guard.error_code:
error_code = guard.error_code
raise TransitionPermissionDenied(
message=f"Permission denied: {user_message}",
error_code=error_code,
user_message=user_message,
required_roles=required_roles,
user_role=user_role,
guard=guard,
)
def raise_validation_error(
guard: Any,
current_state: Optional[str] = None,
field_name: Optional[str] = None,
) -> None:
"""
Raise a TransitionValidationError exception with proper context.
Args:
guard: The guard that failed
current_state: Current state of the object
field_name: Name of the field that failed validation
Raises:
TransitionValidationError: Always raised with proper context
"""
from .guards import StateGuard, MetadataGuard
error_code = TransitionValidationError.ERROR_CODE_VALIDATION_FAILED
user_message = "Validation failed for this transition"
if hasattr(guard, "get_error_message"):
user_message = guard.get_error_message()
if hasattr(guard, "error_code") and guard.error_code:
error_code = guard.error_code
if isinstance(guard, StateGuard):
if guard.error_code == "BLOCKED_STATE":
error_code = TransitionValidationError.ERROR_CODE_BLOCKED_STATE
else:
error_code = TransitionValidationError.ERROR_CODE_INVALID_STATE
current_state = guard._current_state
if isinstance(guard, MetadataGuard):
field_name = guard._failed_field
if guard.error_code == "EMPTY_FIELD":
error_code = TransitionValidationError.ERROR_CODE_EMPTY_FIELD
else:
error_code = TransitionValidationError.ERROR_CODE_MISSING_FIELD
raise TransitionValidationError(
message=f"Validation error: {user_message}",
error_code=error_code,
user_message=user_message,
field_name=field_name,
current_state=current_state,
guard=guard,
)
__all__ = [
# Exception classes
"TransitionPermissionDenied",
"TransitionValidationError",
"TransitionNotAvailable",
# Error message templates
"ERROR_MESSAGES",
# Helper functions
"get_permission_error_message",
"get_state_error_message",
"format_transition_error",
"raise_permission_denied",
"raise_validation_error",
]

View File

@@ -0,0 +1,90 @@
"""State machine fields with rich choice integration."""
from typing import Any, Optional
from django.core.exceptions import ValidationError
from django_fsm import FSMField as DjangoFSMField
from apps.core.choices.base import RichChoice
from apps.core.choices.registry import registry
class RichFSMField(DjangoFSMField):
"""FSMField that uses the rich choice registry for states."""
def __init__(
self,
choice_group: str,
domain: str = "core",
max_length: int = 50,
allow_deprecated: bool = False,
**kwargs: Any,
):
self.choice_group = choice_group
self.domain = domain
self.allow_deprecated = allow_deprecated
if allow_deprecated:
choices_list = registry.get_choices(choice_group, domain)
else:
choices_list = registry.get_active_choices(choice_group, domain)
choices = [(choice.value, choice.label) for choice in choices_list]
kwargs.setdefault("choices", choices)
kwargs.setdefault("max_length", max_length)
super().__init__(**kwargs)
def validate(self, value: Any, model_instance: Any) -> None:
"""Validate the state value against the registry."""
super().validate(value, model_instance)
if value in (None, ""):
return
choice = registry.get_choice(self.choice_group, value, self.domain)
if choice is None:
raise ValidationError(
f"'{value}' is not a valid state for {self.choice_group}"
)
if choice.deprecated and not self.allow_deprecated:
raise ValidationError(
f"'{value}' is deprecated and cannot be used for new entries"
)
def get_rich_choice(self, value: str) -> Optional[RichChoice]:
"""Return the RichChoice object for a given state value."""
return registry.get_choice(self.choice_group, value, self.domain)
def get_choice_display(self, value: str) -> str:
"""Return the label for the given state value."""
return registry.get_choice_display(self.choice_group, value, self.domain)
def contribute_to_class(
self, cls: Any, name: str, private_only: bool = False, **kwargs: Any
) -> None:
"""Attach helpers to the model for convenience."""
super().contribute_to_class(cls, name, private_only=private_only, **kwargs)
def get_rich_choice_method(instance):
state_value = getattr(instance, name)
return self.get_rich_choice(state_value) if state_value else None
setattr(cls, f"get_{name}_rich_choice", get_rich_choice_method)
def get_display_method(instance):
state_value = getattr(instance, name)
return self.get_choice_display(state_value) if state_value else ""
setattr(cls, f"get_{name}_display", get_display_method)
def deconstruct(self):
"""Support Django migrations with custom init kwargs."""
name, path, args, kwargs = super().deconstruct()
kwargs["choice_group"] = self.choice_group
kwargs["domain"] = self.domain
kwargs["allow_deprecated"] = self.allow_deprecated
return name, path, args, kwargs
__all__ = ["RichFSMField"]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,361 @@
"""Model integration utilities for applying state machines to Django models."""
from typing import Type, Optional, Dict, Any, List, Callable
from django.db import models
from django_fsm import can_proceed
from apps.core.state_machine.builder import (
StateTransitionBuilder,
determine_method_name_for_transition,
)
from apps.core.state_machine.registry import (
TransitionInfo,
registry_instance,
)
from apps.core.state_machine.validators import MetadataValidator
from apps.core.state_machine.decorators import TransitionMethodFactory
from apps.core.state_machine.guards import (
create_permission_guard,
extract_guards_from_metadata,
create_condition_from_metadata,
create_guard_from_drf_permission,
CompositeGuard,
)
def apply_state_machine(
model_class: Type[models.Model],
field_name: str,
choice_group: str,
domain: str = "core",
) -> None:
"""
Apply state machine to a Django model.
Args:
model_class: Django model class
field_name: Name of the state field
choice_group: Choice group name
domain: Domain namespace
Raises:
ValueError: If validation fails
"""
# Validate metadata
validator = MetadataValidator(choice_group, domain)
result = validator.validate_choice_group()
if not result.is_valid:
error_messages = [str(e) for e in result.errors]
raise ValueError(
f"Cannot apply state machine - validation failed:\n"
+ "\n".join(error_messages)
)
# Build transition registry
registry_instance.build_registry_from_choices(choice_group, domain)
# Generate and attach transition methods
generate_transition_methods_for_model(
model_class, field_name, choice_group, domain
)
def generate_transition_methods_for_model(
model_class: Type[models.Model],
field_name: str,
choice_group: str,
domain: str = "core",
) -> None:
"""
Dynamically create transition methods on a model.
Args:
model_class: Django model class
field_name: Name of the state field
choice_group: Choice group name
domain: Domain namespace
"""
builder = StateTransitionBuilder(choice_group, domain)
transition_graph = builder.build_transition_graph()
factory = TransitionMethodFactory()
for source, targets in transition_graph.items():
source_metadata = builder.get_choice_metadata(source)
for target in targets:
# Use shared method name determination
method_name = determine_method_name_for_transition(source, target)
# Get target metadata for combined guards
target_metadata = builder.get_choice_metadata(target)
# Extract guards from both source and target metadata
# This ensures metadata flags like requires_assignment, zero_tolerance,
# required_permissions, and escalation_level are enforced
guards = extract_guards_from_metadata(source_metadata)
target_guards = extract_guards_from_metadata(target_metadata)
# Combine all guards
all_guards = guards + target_guards
# Create combined guard if we have multiple guards
combined_guard: Optional[Callable] = None
if len(all_guards) == 1:
combined_guard = all_guards[0]
elif len(all_guards) > 1:
combined_guard = CompositeGuard(guards=all_guards, operator="AND")
# Create appropriate transition method
if "approve" in method_name or "accept" in method_name:
method = factory.create_approve_method(
source=source,
target=target,
field_name=field_name,
permission_guard=combined_guard,
)
elif "reject" in method_name or "deny" in method_name:
method = factory.create_reject_method(
source=source,
target=target,
field_name=field_name,
permission_guard=combined_guard,
)
elif "escalate" in method_name:
method = factory.create_escalate_method(
source=source,
target=target,
field_name=field_name,
permission_guard=combined_guard,
)
else:
method = factory.create_generic_transition_method(
method_name=method_name,
source=source,
target=target,
field_name=field_name,
permission_guard=combined_guard,
)
# Attach method to model class
setattr(model_class, method_name, method)
class StateMachineModelMixin:
"""Mixin providing state machine helper methods for models."""
def get_available_state_transitions(
self, field_name: str = "status"
) -> List[TransitionInfo]:
"""
Get available transitions from current state.
Args:
field_name: Name of the state field
Returns:
List of available TransitionInfo objects
"""
# Get choice group and domain from field
field = self._meta.get_field(field_name)
if not hasattr(field, "choice_group"):
return []
choice_group = field.choice_group
domain = field.domain
current_state = getattr(self, field_name)
return registry_instance.get_available_transitions(
choice_group, domain, current_state
)
def can_transition_to(
self,
target_state: str,
field_name: str = "status",
user: Optional[Any] = None,
) -> bool:
"""
Check if transition to target state is allowed.
Args:
target_state: Target state value
field_name: Name of the state field
user: User attempting transition
Returns:
True if transition is allowed
"""
current_state = getattr(self, field_name)
# Get field metadata
field = self._meta.get_field(field_name)
if not hasattr(field, "choice_group"):
return False
choice_group = field.choice_group
domain = field.domain
# Check if transition exists in registry
transition = registry_instance.get_transition(
choice_group, domain, current_state, target_state
)
if not transition:
return False
# Get transition method and check if it can proceed
method_name = transition.method_name
method = getattr(self, method_name, None)
if method is None:
return False
# Use django-fsm's can_proceed
return can_proceed(method)
def get_transition_method(
self, target_state: str, field_name: str = "status"
) -> Optional[Callable]:
"""
Get the transition method for moving to target state.
Args:
target_state: Target state value
field_name: Name of the state field
Returns:
Transition method or None
"""
current_state = getattr(self, field_name)
field = self._meta.get_field(field_name)
if not hasattr(field, "choice_group"):
return None
choice_group = field.choice_group
domain = field.domain
transition = registry_instance.get_transition(
choice_group, domain, current_state, target_state
)
if not transition:
return None
return getattr(self, transition.method_name, None)
def execute_transition(
self,
target_state: str,
field_name: str = "status",
user: Optional[Any] = None,
**kwargs: Any,
) -> bool:
"""
Execute a transition to target state.
Args:
target_state: Target state value
field_name: Name of the state field
user: User executing transition
**kwargs: Additional arguments for transition method
Returns:
True if transition succeeded
Raises:
ValueError: If transition is not allowed
"""
if not self.can_transition_to(target_state, field_name, user):
raise ValueError(
f"Cannot transition to {target_state} from current state"
)
method = self.get_transition_method(target_state, field_name)
if method is None:
raise ValueError(f"No transition method found for {target_state}")
# Execute transition
method(self, user=user, **kwargs)
return True
def state_machine_model(
field_name: str, choice_group: str, domain: str = "core"
):
"""
Class decorator to automatically apply state machine to models.
Args:
field_name: Name of the state field
choice_group: Choice group name
domain: Domain namespace
Returns:
Decorator function
"""
def decorator(model_class: Type[models.Model]) -> Type[models.Model]:
"""Apply state machine to model class."""
apply_state_machine(model_class, field_name, choice_group, domain)
return model_class
return decorator
def validate_model_state_machine(
model_class: Type[models.Model], field_name: str
) -> bool:
"""
Ensure model is properly configured with state machine.
Args:
model_class: Django model class
field_name: Name of the state field
Returns:
True if properly configured
Raises:
ValueError: If configuration is invalid
"""
# Check field exists
try:
field = model_class._meta.get_field(field_name)
except Exception:
raise ValueError(f"Field {field_name} not found on {model_class}")
# Check if field has choice_group attribute
if not hasattr(field, "choice_group"):
raise ValueError(
f"Field {field_name} is not a RichFSMField or RichChoiceField"
)
# Validate metadata
choice_group = field.choice_group
domain = field.domain
validator = MetadataValidator(choice_group, domain)
result = validator.validate_choice_group()
if not result.is_valid:
error_messages = [str(e) for e in result.errors]
raise ValueError(
f"State machine validation failed:\n" + "\n".join(error_messages)
)
return True
__all__ = [
"apply_state_machine",
"generate_transition_methods_for_model",
"StateMachineModelMixin",
"state_machine_model",
"validate_model_state_machine",
"create_guard_from_drf_permission",
]

View File

@@ -0,0 +1,64 @@
"""Base mixins for django-fsm state machines."""
from typing import Any, Iterable, Optional
from django.db import models
from django_fsm import can_proceed
class StateMachineMixin(models.Model):
"""Common helpers for models that use django-fsm."""
state_field_name: str = "state"
class Meta:
abstract = True
def get_state_value(self, field_name: Optional[str] = None) -> Any:
"""Return the raw state value for the given field (default is `state`)."""
name = field_name or self.state_field_name
return getattr(self, name, None)
def get_state_display_value(self, field_name: Optional[str] = None) -> str:
"""Return the display label for the current state, if available."""
name = field_name or self.state_field_name
getter = getattr(self, f"get_{name}_display", None)
if callable(getter):
return getter()
value = getattr(self, name, "")
return value if value is not None else ""
def get_state_choice(self, field_name: Optional[str] = None):
"""Return the RichChoice object when the field provides one."""
name = field_name or self.state_field_name
getter = getattr(self, f"get_{name}_rich_choice", None)
if callable(getter):
return getter()
return None
def can_transition(self, transition_method_name: str) -> bool:
"""Check if a transition method can proceed for the current instance."""
method = getattr(self, transition_method_name, None)
if method is None or not callable(method):
raise AttributeError(
f"Transition method '{transition_method_name}' not found"
)
return can_proceed(method)
def get_available_transitions(
self, field_name: Optional[str] = None
) -> Iterable[Any]:
"""Return available transitions when helpers are present."""
name = field_name or self.state_field_name
helper_name = f"get_available_{name}_transitions"
helper = getattr(self, helper_name, None)
if callable(helper):
return helper() # type: ignore[misc]
return []
def is_in_state(self, state: str, field_name: Optional[str] = None) -> bool:
"""Convenience check for comparing the current state."""
current_state = self.get_state_value(field_name)
return current_state == state
__all__ = ["StateMachineMixin"]

View File

@@ -0,0 +1,283 @@
"""TransitionRegistry - Centralized registry for managing FSM transitions."""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Tuple
from apps.core.state_machine.builder import StateTransitionBuilder
@dataclass
class TransitionInfo:
"""Information about a state transition."""
source: str
target: str
method_name: str
requires_moderator: bool = False
requires_admin_approval: bool = False
metadata: Dict[str, Any] = field(default_factory=dict)
def __hash__(self):
"""Make TransitionInfo hashable."""
return hash((self.source, self.target, self.method_name))
class TransitionRegistry:
"""Centralized registry for managing and looking up FSM transitions."""
_instance: Optional["TransitionRegistry"] = None
_transitions: Dict[Tuple[str, str], Dict[Tuple[str, str], TransitionInfo]]
def __new__(cls):
"""Implement singleton pattern."""
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._transitions = {}
return cls._instance
def _get_key(self, choice_group: str, domain: str) -> Tuple[str, str]:
"""Generate registry key from choice group and domain."""
return (domain, choice_group)
def register_transition(
self,
choice_group: str,
domain: str,
source: str,
target: str,
method_name: str,
metadata: Optional[Dict[str, Any]] = None,
) -> TransitionInfo:
"""
Register a transition.
Args:
choice_group: Choice group name
domain: Domain namespace
source: Source state
target: Target state
method_name: Name of the transition method
metadata: Additional metadata
Returns:
Registered TransitionInfo
"""
key = self._get_key(choice_group, domain)
transition_key = (source, target)
if key not in self._transitions:
self._transitions[key] = {}
meta = metadata or {}
transition_info = TransitionInfo(
source=source,
target=target,
method_name=method_name,
requires_moderator=meta.get("requires_moderator", False),
requires_admin_approval=meta.get("requires_admin_approval", False),
metadata=meta,
)
self._transitions[key][transition_key] = transition_info
return transition_info
def get_transition(
self, choice_group: str, domain: str, source: str, target: str
) -> Optional[TransitionInfo]:
"""
Retrieve transition info.
Args:
choice_group: Choice group name
domain: Domain namespace
source: Source state
target: Target state
Returns:
TransitionInfo or None if not found
"""
key = self._get_key(choice_group, domain)
transition_key = (source, target)
if key not in self._transitions:
return None
return self._transitions[key].get(transition_key)
def get_available_transitions(
self, choice_group: str, domain: str, current_state: str
) -> List[TransitionInfo]:
"""
Get all valid transitions from a state.
Args:
choice_group: Choice group name
domain: Domain namespace
current_state: Current state value
Returns:
List of available TransitionInfo objects
"""
key = self._get_key(choice_group, domain)
if key not in self._transitions:
return []
available = []
for (source, target), info in self._transitions[key].items():
if source == current_state:
available.append(info)
return available
def get_transition_method_name(
self, choice_group: str, domain: str, source: str, target: str
) -> Optional[str]:
"""
Get the method name for a transition.
Args:
choice_group: Choice group name
domain: Domain namespace
source: Source state
target: Target state
Returns:
Method name or None if not found
"""
transition = self.get_transition(choice_group, domain, source, target)
return transition.method_name if transition else None
def validate_transition(
self, choice_group: str, domain: str, source: str, target: str
) -> bool:
"""
Check if a transition is valid.
Args:
choice_group: Choice group name
domain: Domain namespace
source: Source state
target: Target state
Returns:
True if transition is valid
"""
return (
self.get_transition(choice_group, domain, source, target) is not None
)
def build_registry_from_choices(
self, choice_group: str, domain: str = "core"
) -> None:
"""
Automatically populate registry from RichChoice metadata.
Args:
choice_group: Choice group name
domain: Domain namespace
"""
from apps.core.state_machine.builder import (
determine_method_name_for_transition,
)
builder = StateTransitionBuilder(choice_group, domain)
transition_graph = builder.build_transition_graph()
for source, targets in transition_graph.items():
source_metadata = builder.get_choice_metadata(source)
for target in targets:
# Use shared method name determination
method_name = determine_method_name_for_transition(
source, target
)
self.register_transition(
choice_group=choice_group,
domain=domain,
source=source,
target=target,
method_name=method_name,
metadata=source_metadata,
)
def clear_registry(
self,
choice_group: Optional[str] = None,
domain: Optional[str] = None,
) -> None:
"""
Clear registry entries for testing.
Args:
choice_group: Optional specific choice group to clear
domain: Optional specific domain to clear
"""
if choice_group and domain:
key = self._get_key(choice_group, domain)
if key in self._transitions:
del self._transitions[key]
else:
self._transitions.clear()
def export_transition_graph(
self, choice_group: str, domain: str, format: str = "dict"
) -> Any:
"""
Export state machine graph for visualization.
Args:
choice_group: Choice group name
domain: Domain namespace
format: Export format ('dict', 'mermaid', 'dot')
Returns:
Transition graph in requested format
"""
key = self._get_key(choice_group, domain)
if key not in self._transitions:
return {} if format == "dict" else ""
if format == "dict":
graph: Dict[str, List[str]] = {}
for (source, target), info in self._transitions[key].items():
if source not in graph:
graph[source] = []
graph[source].append(target)
return graph
elif format == "mermaid":
lines = ["stateDiagram-v2"]
for (source, target), info in self._transitions[key].items():
lines.append(f" {source} --> {target}: {info.method_name}")
return "\n".join(lines)
elif format == "dot":
lines = ["digraph {"]
for (source, target), info in self._transitions[key].items():
lines.append(
f' "{source}" -> "{target}" '
f'[label="{info.method_name}"];'
)
lines.append("}")
return "\n".join(lines)
else:
raise ValueError(f"Unsupported format: {format}")
def get_all_registered_groups(self) -> List[Tuple[str, str]]:
"""
Get all registered choice groups.
Returns:
List of (domain, choice_group) tuples
"""
return list(self._transitions.keys())
# Global registry instance
registry_instance = TransitionRegistry()
__all__ = ["TransitionInfo", "TransitionRegistry", "registry_instance"]

View File

@@ -0,0 +1 @@
"""Test package initialization."""

View File

@@ -0,0 +1,141 @@
"""Tests for StateTransitionBuilder."""
import pytest
from django.core.exceptions import ImproperlyConfigured
from apps.core.choices.base import RichChoice, ChoiceCategory
from apps.core.choices.registry import registry
from apps.core.state_machine.builder import StateTransitionBuilder
@pytest.fixture
def sample_choices():
"""Create sample choices for testing."""
choices = [
RichChoice(
value="pending",
label="Pending",
description="Awaiting review",
metadata={"can_transition_to": ["approved", "rejected"]},
category=ChoiceCategory.STATUS,
),
RichChoice(
value="approved",
label="Approved",
description="Approved by moderator",
metadata={"is_final": True, "can_transition_to": []},
category=ChoiceCategory.STATUS,
),
RichChoice(
value="rejected",
label="Rejected",
description="Rejected by moderator",
metadata={"is_final": True, "can_transition_to": []},
category=ChoiceCategory.STATUS,
),
]
registry.register("test_states", choices, domain="test")
yield choices
registry.clear_domain("test")
def test_builder_initialization_valid(sample_choices):
"""Test builder initializes with valid choice group."""
builder = StateTransitionBuilder("test_states", "test")
assert builder.choice_group == "test_states"
assert builder.domain == "test"
assert len(builder.choices) == 3
def test_builder_initialization_invalid():
"""Test builder raises error for invalid choice group."""
with pytest.raises(ImproperlyConfigured):
StateTransitionBuilder("nonexistent", "test")
def test_get_choice_metadata(sample_choices):
"""Test metadata extraction for states."""
builder = StateTransitionBuilder("test_states", "test")
metadata = builder.get_choice_metadata("pending")
assert "can_transition_to" in metadata
assert metadata["can_transition_to"] == ["approved", "rejected"]
def test_extract_valid_transitions(sample_choices):
"""Test extraction of valid transitions."""
builder = StateTransitionBuilder("test_states", "test")
transitions = builder.extract_valid_transitions("pending")
assert transitions == ["approved", "rejected"]
def test_extract_valid_transitions_invalid_target():
"""Test validation fails for invalid transition targets."""
invalid_choices = [
RichChoice(
value="pending",
label="Pending",
metadata={"can_transition_to": ["nonexistent"]},
),
]
registry.register("invalid_test", invalid_choices, domain="test")
builder = StateTransitionBuilder("invalid_test", "test")
with pytest.raises(ImproperlyConfigured):
builder.extract_valid_transitions("pending")
registry.clear_domain("test")
def test_is_terminal_state(sample_choices):
"""Test terminal state detection."""
builder = StateTransitionBuilder("test_states", "test")
assert not builder.is_terminal_state("pending")
assert builder.is_terminal_state("approved")
assert builder.is_terminal_state("rejected")
def test_build_transition_graph(sample_choices):
"""Test transition graph building."""
builder = StateTransitionBuilder("test_states", "test")
graph = builder.build_transition_graph()
assert graph["pending"] == ["approved", "rejected"]
assert graph["approved"] == []
assert graph["rejected"] == []
def test_caching_mechanism(sample_choices):
"""Test that caching works correctly."""
builder = StateTransitionBuilder("test_states", "test")
# First call builds cache
metadata1 = builder.get_choice_metadata("pending")
# Second call uses cache
metadata2 = builder.get_choice_metadata("pending")
assert metadata1 == metadata2
assert "metadata_pending" in builder._cache
def test_clear_cache(sample_choices):
"""Test cache clearing."""
builder = StateTransitionBuilder("test_states", "test")
builder.get_choice_metadata("pending")
assert len(builder._cache) > 0
builder.clear_cache()
assert len(builder._cache) == 0
def test_get_all_states(sample_choices):
"""Test getting all state values."""
builder = StateTransitionBuilder("test_states", "test")
states = builder.get_all_states()
assert set(states) == {"pending", "approved", "rejected"}
def test_get_choice(sample_choices):
"""Test getting RichChoice object."""
builder = StateTransitionBuilder("test_states", "test")
choice = builder.get_choice("pending")
assert choice is not None
assert choice.value == "pending"
assert choice.label == "Pending"

View File

@@ -0,0 +1,163 @@
"""Tests for transition decorator generation."""
import pytest
from unittest.mock import Mock
from apps.core.state_machine.decorators import (
generate_transition_decorator,
create_transition_method,
TransitionMethodFactory,
with_transition_logging,
)
def test_generate_transition_decorator():
"""Test basic transition decorator generation."""
decorator = generate_transition_decorator(
source="pending", target="approved", field_name="status"
)
assert callable(decorator)
def test_create_transition_method_basic():
"""Test basic transition method creation."""
method = create_transition_method(
method_name="approve",
source="pending",
target="approved",
field_name="status",
)
assert callable(method)
assert method.__name__ == "approve"
assert "pending" in method.__doc__
assert "approved" in method.__doc__
def test_create_transition_method_with_guard():
"""Test transition method with permission guard."""
def mock_guard(instance, user=None):
return user is not None
method = create_transition_method(
method_name="approve",
source="pending",
target="approved",
field_name="status",
permission_guard=mock_guard,
)
assert callable(method)
def test_create_transition_method_with_callbacks():
"""Test transition method with callbacks."""
success_called = []
error_called = []
def on_success(instance, user=None, **kwargs):
success_called.append(True)
def on_error(instance, exception):
error_called.append(True)
method = create_transition_method(
method_name="approve",
source="pending",
target="approved",
field_name="status",
on_success=on_success,
on_error=on_error,
)
assert callable(method)
def test_factory_create_approve_method():
"""Test approval method creation."""
factory = TransitionMethodFactory()
method = factory.create_approve_method(
source="pending", target="approved", field_name="status"
)
assert callable(method)
assert method.__name__ == "approve"
def test_factory_create_reject_method():
"""Test rejection method creation."""
factory = TransitionMethodFactory()
method = factory.create_reject_method(
source="pending", target="rejected", field_name="status"
)
assert callable(method)
assert method.__name__ == "reject"
def test_factory_create_escalate_method():
"""Test escalation method creation."""
factory = TransitionMethodFactory()
method = factory.create_escalate_method(
source="pending", target="escalated", field_name="status"
)
assert callable(method)
assert method.__name__ == "escalate"
def test_factory_create_generic_method():
"""Test generic transition method creation."""
factory = TransitionMethodFactory()
method = factory.create_generic_transition_method(
method_name="custom_transition",
source="pending",
target="processed",
field_name="status",
)
assert callable(method)
assert method.__name__ == "custom_transition"
def test_factory_generic_method_with_docstring():
"""Test generic method with custom docstring."""
factory = TransitionMethodFactory()
custom_doc = "This is a custom transition"
method = factory.create_generic_transition_method(
method_name="custom_transition",
source="pending",
target="processed",
field_name="status",
docstring=custom_doc,
)
assert method.__doc__ == custom_doc
def test_with_transition_logging():
"""Test logging decorator wrapper."""
def sample_transition(instance, user=None):
return "result"
wrapped = with_transition_logging(sample_transition)
assert callable(wrapped)
# Test execution (should work even if django-fsm-log not installed)
mock_instance = Mock()
result = wrapped(mock_instance, user=None)
# If django-fsm-log not available, it should still execute
assert result is not None or result is None
def test_method_signature_generation():
"""Test that generated methods have proper signatures."""
factory = TransitionMethodFactory()
method = factory.create_approve_method(
source="pending", target="approved"
)
# Check method accepts expected parameters
mock_instance = Mock()
mock_user = Mock()
# Should not raise
try:
method(mock_instance, user=mock_user, comment="test")
except Exception:
# May fail due to django-fsm not being fully configured
# but signature should be correct
pass

View File

@@ -0,0 +1,242 @@
"""Tests for guards and conditions."""
import pytest
from unittest.mock import Mock
from apps.core.state_machine.guards import (
PermissionGuard,
extract_guards_from_metadata,
create_permission_guard,
GuardRegistry,
guard_registry,
create_condition_from_metadata,
is_moderator_or_above,
is_admin_or_above,
has_permission,
)
def test_permission_guard_creation():
"""Test PermissionGuard creation."""
guard = PermissionGuard(requires_moderator=True)
assert guard.requires_moderator is True
assert guard.requires_admin is False
def test_permission_guard_no_user():
"""Test guard returns False with no user."""
guard = PermissionGuard(requires_moderator=True)
result = guard(None, user=None)
assert result is False
def test_permission_guard_moderator():
"""Test moderator permission check."""
guard = PermissionGuard(requires_moderator=True)
# Mock user with moderator permissions
user = Mock()
user.is_authenticated = True
user.is_staff = True
instance = Mock()
result = guard(instance, user=user)
assert result is True
def test_permission_guard_admin():
"""Test admin permission check."""
guard = PermissionGuard(requires_admin=True)
# Mock user with admin permissions
user = Mock()
user.is_authenticated = True
user.is_superuser = True
instance = Mock()
result = guard(instance, user=user)
assert result is True
def test_permission_guard_custom_check():
"""Test custom permission check."""
def custom_check(instance, user):
return user.username == "special"
guard = PermissionGuard(custom_check=custom_check)
user = Mock()
user.username = "special"
instance = Mock()
result = guard(instance, user=user)
assert result is True
def test_permission_guard_error_message():
"""Test error message generation."""
guard = PermissionGuard(requires_moderator=True)
message = guard.get_error_message()
assert "moderator" in message.lower()
def test_extract_guards_from_metadata():
"""Test extracting guards from metadata."""
metadata = {"requires_moderator": True}
guards = extract_guards_from_metadata(metadata)
assert len(guards) == 1
assert isinstance(guards[0], PermissionGuard)
def test_extract_guards_no_permissions():
"""Test extracting guards with no permissions."""
metadata = {}
guards = extract_guards_from_metadata(metadata)
assert len(guards) == 0
def test_create_permission_guard():
"""Test creating permission guard from metadata."""
metadata = {"requires_moderator": True, "requires_admin_approval": False}
guard = create_permission_guard(metadata)
assert isinstance(guard, PermissionGuard)
assert guard.requires_moderator is True
def test_guard_registry_singleton():
"""Test GuardRegistry is a singleton."""
reg1 = GuardRegistry()
reg2 = GuardRegistry()
assert reg1 is reg2
def test_guard_registry_register():
"""Test registering custom guard."""
def custom_guard(instance, user):
return True
guard_registry.register_guard("custom", custom_guard)
retrieved = guard_registry.get_guard("custom")
assert retrieved is custom_guard
guard_registry.clear_guards()
def test_guard_registry_get_nonexistent():
"""Test getting non-existent guard."""
result = guard_registry.get_guard("nonexistent")
assert result is None
def test_guard_registry_apply_guards():
"""Test applying multiple guards."""
def guard1(instance, user):
return True
def guard2(instance, user):
return True
guards = [guard1, guard2]
instance = Mock()
user = Mock()
allowed, error = guard_registry.apply_guards(instance, guards, user)
assert allowed is True
assert error is None
def test_guard_registry_apply_guards_failure():
"""Test guards fail when one returns False."""
def guard1(instance, user):
return True
def guard2(instance, user):
return False
guards = [guard1, guard2]
instance = Mock()
user = Mock()
allowed, error = guard_registry.apply_guards(instance, guards, user)
assert allowed is False
assert error is not None
def test_create_condition_from_metadata():
"""Test creating FSM condition from metadata."""
metadata = {"requires_moderator": True}
condition = create_condition_from_metadata(metadata)
assert callable(condition)
def test_create_condition_no_guards():
"""Test condition creation with no guards."""
metadata = {}
condition = create_condition_from_metadata(metadata)
assert condition is None
def test_is_moderator_or_above_no_user():
"""Test moderator check with no user."""
assert is_moderator_or_above(None) is False
def test_is_moderator_or_above_unauthenticated():
"""Test moderator check with unauthenticated user."""
user = Mock()
user.is_authenticated = False
assert is_moderator_or_above(user) is False
def test_is_moderator_or_above_staff():
"""Test moderator check with staff user."""
user = Mock()
user.is_authenticated = True
user.is_staff = True
assert is_moderator_or_above(user) is True
def test_is_moderator_or_above_superuser():
"""Test moderator check with superuser."""
user = Mock()
user.is_authenticated = True
user.is_superuser = True
assert is_moderator_or_above(user) is True
def test_is_admin_or_above_no_user():
"""Test admin check with no user."""
assert is_admin_or_above(None) is False
def test_is_admin_or_above_superuser():
"""Test admin check with superuser."""
user = Mock()
user.is_authenticated = True
user.is_superuser = True
assert is_admin_or_above(user) is True
def test_has_permission_no_user():
"""Test permission check with no user."""
assert has_permission(None, "some.permission") is False
def test_has_permission_superuser():
"""Test permission check with superuser."""
user = Mock()
user.is_authenticated = True
user.is_superuser = True
assert has_permission(user, "any.permission") is True
def test_has_permission_with_perm():
"""Test permission check with has_perm."""
user = Mock()
user.is_authenticated = True
user.is_superuser = False
user.has_perm = Mock(return_value=True)
assert has_permission(user, "specific.permission") is True

View File

@@ -0,0 +1,282 @@
"""Integration tests for state machine model integration."""
import pytest
from unittest.mock import Mock, patch
from django.core.exceptions import ImproperlyConfigured
from apps.core.choices.base import RichChoice
from apps.core.choices.registry import registry
from apps.core.state_machine.integration import (
apply_state_machine,
generate_transition_methods_for_model,
StateMachineModelMixin,
state_machine_model,
validate_model_state_machine,
)
from apps.core.state_machine.registry import registry_instance
@pytest.fixture
def sample_choices():
"""Create sample choices for testing."""
choices = [
RichChoice(
value="pending",
label="Pending",
metadata={"can_transition_to": ["approved", "rejected"]},
),
RichChoice(
value="approved",
label="Approved",
metadata={"is_final": True, "can_transition_to": []},
),
RichChoice(
value="rejected",
label="Rejected",
metadata={"is_final": True, "can_transition_to": []},
),
]
registry.register("test_states", choices, domain="test")
yield choices
registry.clear_domain("test")
registry_instance.clear_registry()
def test_apply_state_machine_valid(sample_choices):
"""Test applying state machine to model with valid metadata."""
# Mock model class
mock_model = type("MockModel", (), {})
# Should not raise
apply_state_machine(mock_model, "status", "test_states", "test")
def test_apply_state_machine_invalid():
"""Test applying state machine fails with invalid metadata."""
choices = [
RichChoice(
value="pending",
label="Pending",
metadata={"can_transition_to": ["nonexistent"]},
),
]
registry.register("invalid_states", choices, domain="test")
mock_model = type("MockModel", (), {})
with pytest.raises(ValueError) as exc_info:
apply_state_machine(mock_model, "status", "invalid_states", "test")
assert "validation failed" in str(exc_info.value).lower()
registry.clear_domain("test")
def test_generate_transition_methods(sample_choices):
"""Test generating transition methods on model."""
mock_model = type("MockModel", (), {})
generate_transition_methods_for_model(
mock_model, "status", "test_states", "test"
)
# Check that transition methods were added
# Method names may vary based on implementation
assert hasattr(mock_model, "approve") or hasattr(
mock_model, "transition_to_approved"
)
def test_state_machine_model_decorator(sample_choices):
"""Test state_machine_model decorator."""
@state_machine_model(
field_name="status", choice_group="test_states", domain="test"
)
class TestModel:
pass
# Decorator should apply state machine
# Check for transition methods
assert hasattr(TestModel, "approve") or hasattr(
TestModel, "transition_to_approved"
)
def test_state_machine_mixin_get_available_transitions():
"""Test StateMachineModelMixin.get_available_state_transitions."""
class TestModel(StateMachineModelMixin):
class _meta:
@staticmethod
def get_field(name):
field = Mock()
field.choice_group = "test_states"
field.domain = "test"
return field
status = "pending"
# Setup registry
registry_instance.register_transition(
choice_group="test_states",
domain="test",
source="pending",
target="approved",
method_name="approve",
)
instance = TestModel()
transitions = instance.get_available_state_transitions("status")
# Should return available transitions
assert isinstance(transitions, list)
def test_state_machine_mixin_can_transition_to():
"""Test StateMachineModelMixin.can_transition_to."""
class TestModel(StateMachineModelMixin):
class _meta:
@staticmethod
def get_field(name):
field = Mock()
field.choice_group = "test_states"
field.domain = "test"
return field
status = "pending"
def approve(self):
pass
instance = TestModel()
# Setup registry
registry_instance.register_transition(
choice_group="test_states",
domain="test",
source="pending",
target="approved",
method_name="approve",
)
# Mock can_proceed to return True
with patch(
"backend.apps.core.state_machine.integration.can_proceed",
return_value=True,
):
result = instance.can_transition_to("approved", "status")
assert result is True
def test_state_machine_mixin_get_transition_method():
"""Test StateMachineModelMixin.get_transition_method."""
class TestModel(StateMachineModelMixin):
class _meta:
@staticmethod
def get_field(name):
field = Mock()
field.choice_group = "test_states"
field.domain = "test"
return field
status = "pending"
def approve(self):
pass
instance = TestModel()
# Setup registry
registry_instance.register_transition(
choice_group="test_states",
domain="test",
source="pending",
target="approved",
method_name="approve",
)
method = instance.get_transition_method("approved", "status")
assert method is not None
assert callable(method)
def test_state_machine_mixin_execute_transition():
"""Test StateMachineModelMixin.execute_transition."""
class TestModel(StateMachineModelMixin):
class _meta:
@staticmethod
def get_field(name):
field = Mock()
field.choice_group = "test_states"
field.domain = "test"
return field
status = "pending"
def approve(self, user=None, **kwargs):
self.status = "approved"
instance = TestModel()
# Setup registry
registry_instance.register_transition(
choice_group="test_states",
domain="test",
source="pending",
target="approved",
method_name="approve",
)
# Mock can_proceed
with patch(
"backend.apps.core.state_machine.integration.can_proceed",
return_value=True,
):
result = instance.execute_transition("approved", "status")
assert result is True
def test_validate_model_state_machine_valid(sample_choices):
"""Test model validation with valid configuration."""
class TestModel:
class _meta:
@staticmethod
def get_field(name):
field = Mock()
field.choice_group = "test_states"
field.domain = "test"
return field
result = validate_model_state_machine(TestModel, "status")
assert result is True
def test_validate_model_state_machine_missing_field():
"""Test validation fails when field is missing."""
class TestModel:
class _meta:
@staticmethod
def get_field(name):
raise Exception("Field not found")
with pytest.raises(ValueError) as exc_info:
validate_model_state_machine(TestModel, "status")
assert "not found" in str(exc_info.value).lower()
def test_validate_model_state_machine_not_fsm_field():
"""Test validation fails when field is not FSM field."""
class TestModel:
class _meta:
@staticmethod
def get_field(name):
return Mock(spec=[]) # Field without choice_group
with pytest.raises(ValueError) as exc_info:
validate_model_state_machine(TestModel, "status")
assert "RichFSMField" in str(exc_info.value)

View File

@@ -0,0 +1,252 @@
"""Tests for TransitionRegistry."""
import pytest
from apps.core.choices.base import RichChoice
from apps.core.choices.registry import registry
from apps.core.state_machine.registry import (
TransitionRegistry,
TransitionInfo,
registry_instance,
)
@pytest.fixture
def sample_choices():
"""Create sample choices for testing."""
choices = [
RichChoice(
value="pending",
label="Pending",
metadata={
"can_transition_to": ["approved", "rejected"],
"requires_moderator": True,
},
),
RichChoice(
value="approved",
label="Approved",
metadata={"is_final": True, "can_transition_to": []},
),
RichChoice(
value="rejected",
label="Rejected",
metadata={"is_final": True, "can_transition_to": []},
),
]
registry.register("test_states", choices, domain="test")
yield choices
registry.clear_domain("test")
registry_instance.clear_registry()
def test_transition_info_creation():
"""Test TransitionInfo dataclass creation."""
info = TransitionInfo(
source="pending",
target="approved",
method_name="approve",
requires_moderator=True,
)
assert info.source == "pending"
assert info.target == "approved"
assert info.method_name == "approve"
assert info.requires_moderator is True
def test_transition_info_hashable():
"""Test TransitionInfo is hashable."""
info1 = TransitionInfo(
source="pending", target="approved", method_name="approve"
)
info2 = TransitionInfo(
source="pending", target="approved", method_name="approve"
)
assert hash(info1) == hash(info2)
def test_registry_singleton():
"""Test TransitionRegistry is a singleton."""
reg1 = TransitionRegistry()
reg2 = TransitionRegistry()
assert reg1 is reg2
def test_register_transition():
"""Test transition registration."""
registry_instance.register_transition(
choice_group="test_states",
domain="test",
source="pending",
target="approved",
method_name="approve",
metadata={"requires_moderator": True},
)
transition = registry_instance.get_transition(
"test_states", "test", "pending", "approved"
)
assert transition is not None
assert transition.method_name == "approve"
assert transition.requires_moderator is True
def test_get_transition_not_found():
"""Test getting non-existent transition."""
transition = registry_instance.get_transition(
"nonexistent", "test", "pending", "approved"
)
assert transition is None
def test_get_available_transitions(sample_choices):
"""Test getting available transitions from a state."""
registry_instance.build_registry_from_choices("test_states", "test")
available = registry_instance.get_available_transitions(
"test_states", "test", "pending"
)
assert len(available) == 2
targets = [t.target for t in available]
assert "approved" in targets
assert "rejected" in targets
def test_get_transition_method_name():
"""Test getting transition method name."""
registry_instance.register_transition(
choice_group="test_states",
domain="test",
source="pending",
target="approved",
method_name="approve",
)
method_name = registry_instance.get_transition_method_name(
"test_states", "test", "pending", "approved"
)
assert method_name == "approve"
def test_validate_transition():
"""Test transition validation."""
registry_instance.register_transition(
choice_group="test_states",
domain="test",
source="pending",
target="approved",
method_name="approve",
)
assert registry_instance.validate_transition(
"test_states", "test", "pending", "approved"
)
assert not registry_instance.validate_transition(
"test_states", "test", "pending", "nonexistent"
)
def test_build_registry_from_choices(sample_choices):
"""Test automatic registry building from RichChoice metadata."""
registry_instance.build_registry_from_choices("test_states", "test")
# Check transitions were registered
transition = registry_instance.get_transition(
"test_states", "test", "pending", "approved"
)
assert transition is not None
def test_clear_registry_specific():
"""Test clearing specific choice group."""
registry_instance.register_transition(
choice_group="test_states",
domain="test",
source="pending",
target="approved",
method_name="approve",
)
registry_instance.clear_registry(choice_group="test_states", domain="test")
transition = registry_instance.get_transition(
"test_states", "test", "pending", "approved"
)
assert transition is None
def test_clear_registry_all():
"""Test clearing entire registry."""
registry_instance.register_transition(
choice_group="test_states",
domain="test",
source="pending",
target="approved",
method_name="approve",
)
registry_instance.clear_registry()
transition = registry_instance.get_transition(
"test_states", "test", "pending", "approved"
)
assert transition is None
def test_export_transition_graph_dict(sample_choices):
"""Test exporting transition graph as dict."""
registry_instance.build_registry_from_choices("test_states", "test")
graph = registry_instance.export_transition_graph(
"test_states", "test", format="dict"
)
assert isinstance(graph, dict)
assert "pending" in graph
assert set(graph["pending"]) == {"approved", "rejected"}
def test_export_transition_graph_mermaid(sample_choices):
"""Test exporting transition graph as mermaid."""
registry_instance.build_registry_from_choices("test_states", "test")
graph = registry_instance.export_transition_graph(
"test_states", "test", format="mermaid"
)
assert isinstance(graph, str)
assert "stateDiagram-v2" in graph
assert "pending" in graph
def test_export_transition_graph_dot(sample_choices):
"""Test exporting transition graph as DOT."""
registry_instance.build_registry_from_choices("test_states", "test")
graph = registry_instance.export_transition_graph(
"test_states", "test", format="dot"
)
assert isinstance(graph, str)
assert "digraph" in graph
assert "pending" in graph
def test_export_invalid_format(sample_choices):
"""Test exporting with invalid format."""
registry_instance.build_registry_from_choices("test_states", "test")
with pytest.raises(ValueError):
registry_instance.export_transition_graph(
"test_states", "test", format="invalid"
)
def test_get_all_registered_groups():
"""Test getting all registered choice groups."""
registry_instance.register_transition(
choice_group="test_states",
domain="test",
source="pending",
target="approved",
method_name="approve",
)
groups = registry_instance.get_all_registered_groups()
assert ("test", "test_states") in groups

View File

@@ -0,0 +1,243 @@
"""Tests for metadata validators."""
import pytest
from apps.core.choices.base import RichChoice
from apps.core.choices.registry import registry
from apps.core.state_machine.validators import (
MetadataValidator,
ValidationResult,
ValidationError,
ValidationWarning,
validate_on_registration,
)
@pytest.fixture
def valid_choices():
"""Create valid choices for testing."""
choices = [
RichChoice(
value="pending",
label="Pending",
metadata={"can_transition_to": ["approved", "rejected"]},
),
RichChoice(
value="approved",
label="Approved",
metadata={"is_final": True, "can_transition_to": []},
),
RichChoice(
value="rejected",
label="Rejected",
metadata={"is_final": True, "can_transition_to": []},
),
]
registry.register("valid_states", choices, domain="test")
yield choices
registry.clear_domain("test")
@pytest.fixture
def invalid_transition_choices():
"""Create choices with invalid transition targets."""
choices = [
RichChoice(
value="pending",
label="Pending",
metadata={"can_transition_to": ["nonexistent"]},
),
]
registry.register("invalid_trans", choices, domain="test")
yield choices
registry.clear_domain("test")
@pytest.fixture
def terminal_with_transitions():
"""Create terminal state with outgoing transitions."""
choices = [
RichChoice(
value="final",
label="Final",
metadata={"is_final": True, "can_transition_to": ["pending"]},
),
RichChoice(value="pending", label="Pending", metadata={}),
]
registry.register("terminal_trans", choices, domain="test")
yield choices
registry.clear_domain("test")
def test_validation_error_creation():
"""Test ValidationError creation."""
error = ValidationError(
code="TEST_ERROR", message="Test message", state="pending"
)
assert error.code == "TEST_ERROR"
assert error.message == "Test message"
assert error.state == "pending"
assert "pending" in str(error)
def test_validation_warning_creation():
"""Test ValidationWarning creation."""
warning = ValidationWarning(
code="TEST_WARNING", message="Test warning", state="pending"
)
assert warning.code == "TEST_WARNING"
assert warning.message == "Test warning"
def test_validation_result_add_error():
"""Test adding errors to ValidationResult."""
result = ValidationResult(is_valid=True)
result.add_error("ERROR_CODE", "Error message", "pending")
assert not result.is_valid
assert len(result.errors) == 1
def test_validation_result_add_warning():
"""Test adding warnings to ValidationResult."""
result = ValidationResult(is_valid=True)
result.add_warning("WARNING_CODE", "Warning message")
assert result.is_valid # Warnings don't affect validity
assert len(result.warnings) == 1
def test_validator_initialization(valid_choices):
"""Test validator initialization."""
validator = MetadataValidator("valid_states", "test")
assert validator.choice_group == "valid_states"
assert validator.domain == "test"
def test_validate_choice_group_valid(valid_choices):
"""Test validation passes for valid choice group."""
validator = MetadataValidator("valid_states", "test")
result = validator.validate_choice_group()
assert result.is_valid
assert len(result.errors) == 0
def test_validate_transitions_valid(valid_choices):
"""Test transition validation passes for valid transitions."""
validator = MetadataValidator("valid_states", "test")
errors = validator.validate_transitions()
assert len(errors) == 0
def test_validate_transitions_invalid(invalid_transition_choices):
"""Test transition validation fails for invalid targets."""
validator = MetadataValidator("invalid_trans", "test")
errors = validator.validate_transitions()
assert len(errors) > 0
assert errors[0].code == "INVALID_TRANSITION_TARGET"
def test_validate_terminal_states_valid(valid_choices):
"""Test terminal state validation passes."""
validator = MetadataValidator("valid_states", "test")
errors = validator.validate_terminal_states()
assert len(errors) == 0
def test_validate_terminal_states_invalid(terminal_with_transitions):
"""Test terminal state validation fails when terminal has transitions."""
validator = MetadataValidator("terminal_trans", "test")
errors = validator.validate_terminal_states()
assert len(errors) > 0
assert errors[0].code == "TERMINAL_STATE_HAS_TRANSITIONS"
def test_validate_permission_consistency(valid_choices):
"""Test permission consistency validation."""
validator = MetadataValidator("valid_states", "test")
errors = validator.validate_permission_consistency()
assert len(errors) == 0
def test_validate_no_cycles(valid_choices):
"""Test cycle detection."""
validator = MetadataValidator("valid_states", "test")
errors = validator.validate_no_cycles()
assert len(errors) == 0
def test_validate_no_cycles_with_cycle():
"""Test cycle detection finds cycles."""
choices = [
RichChoice(
value="a", label="A", metadata={"can_transition_to": ["b"]}
),
RichChoice(
value="b", label="B", metadata={"can_transition_to": ["c"]}
),
RichChoice(
value="c", label="C", metadata={"can_transition_to": ["a"]}
),
]
registry.register("cycle_states", choices, domain="test")
validator = MetadataValidator("cycle_states", "test")
errors = validator.validate_no_cycles()
assert len(errors) > 0
assert errors[0].code == "STATE_CYCLE_DETECTED"
registry.clear_domain("test")
def test_validate_reachability(valid_choices):
"""Test reachability validation."""
validator = MetadataValidator("valid_states", "test")
errors = validator.validate_reachability()
# Should pass - approved and rejected are reachable from pending
assert len(errors) == 0
def test_validate_reachability_unreachable():
"""Test reachability detects unreachable states."""
choices = [
RichChoice(
value="pending",
label="Pending",
metadata={"can_transition_to": ["approved"]},
),
RichChoice(
value="approved", label="Approved", metadata={"is_final": True}
),
RichChoice(
value="orphan",
label="Orphan",
metadata={"can_transition_to": []},
),
]
registry.register("unreachable_states", choices, domain="test")
validator = MetadataValidator("unreachable_states", "test")
errors = validator.validate_reachability()
# Orphan state should be flagged as unreachable
assert len(errors) > 0
registry.clear_domain("test")
def test_generate_validation_report(valid_choices):
"""Test validation report generation."""
validator = MetadataValidator("valid_states", "test")
report = validator.generate_validation_report()
assert isinstance(report, str)
assert "valid_states" in report
assert "VALID" in report
def test_validate_on_registration_valid(valid_choices):
"""Test validate_on_registration succeeds for valid choices."""
result = validate_on_registration("valid_states", "test")
assert result is True
def test_validate_on_registration_invalid(invalid_transition_choices):
"""Test validate_on_registration raises error for invalid choices."""
with pytest.raises(ValueError) as exc_info:
validate_on_registration("invalid_trans", "test")
assert "Validation failed" in str(exc_info.value)

View File

@@ -0,0 +1,390 @@
"""Metadata validators for ensuring RichChoice metadata meets FSM requirements."""
from dataclasses import dataclass, field
from typing import List, Dict, Set, Optional, Any
from apps.core.state_machine.builder import StateTransitionBuilder
from apps.core.choices.registry import registry
@dataclass
class ValidationError:
"""A validation error with details."""
code: str
message: str
state: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
def __str__(self):
"""String representation of the error."""
if self.state:
return f"[{self.code}] {self.state}: {self.message}"
return f"[{self.code}] {self.message}"
@dataclass
class ValidationWarning:
"""A validation warning with details."""
code: str
message: str
state: Optional[str] = None
def __str__(self):
"""String representation of the warning."""
if self.state:
return f"[{self.code}] {self.state}: {self.message}"
return f"[{self.code}] {self.message}"
@dataclass
class ValidationResult:
"""Result of metadata validation."""
is_valid: bool
errors: List[ValidationError] = field(default_factory=list)
warnings: List[ValidationWarning] = field(default_factory=list)
def add_error(self, code: str, message: str, state: Optional[str] = None):
"""Add a validation error."""
self.errors.append(ValidationError(code, message, state))
self.is_valid = False
def add_warning(self, code: str, message: str, state: Optional[str] = None):
"""Add a validation warning."""
self.warnings.append(ValidationWarning(code, message, state))
class MetadataValidator:
"""Validator for RichChoice metadata in state machine context."""
def __init__(self, choice_group: str, domain: str = "core"):
"""
Initialize validator.
Args:
choice_group: Choice group name
domain: Domain namespace
"""
self.choice_group = choice_group
self.domain = domain
self.builder = StateTransitionBuilder(choice_group, domain)
def validate_choice_group(self) -> ValidationResult:
"""
Validate entire choice group.
Returns:
ValidationResult with all errors and warnings
"""
result = ValidationResult(is_valid=True)
# Run all validation checks
result.errors.extend(self.validate_transitions())
result.errors.extend(self.validate_terminal_states())
result.errors.extend(self.validate_permission_consistency())
result.errors.extend(self.validate_no_cycles())
result.errors.extend(self.validate_reachability())
# Set validity based on errors
result.is_valid = len(result.errors) == 0
return result
def validate_transitions(self) -> List[ValidationError]:
"""
Check all can_transition_to references exist.
Returns:
List of validation errors
"""
from django.core.exceptions import ImproperlyConfigured
errors = []
all_states = set(self.builder.get_all_states())
for state in all_states:
# Check if can_transition_to is explicitly defined
metadata = self.builder.get_choice_metadata(state)
if "can_transition_to" not in metadata:
errors.append(
ValidationError(
code="MISSING_CAN_TRANSITION_TO",
message=(
"State metadata must explicitly define "
"'can_transition_to' (use [] for terminal states)"
),
state=state,
)
)
continue
# Validate transition targets exist, catching configuration errors
try:
transitions = self.builder.extract_valid_transitions(state)
except ImproperlyConfigured as e:
# Convert ImproperlyConfigured to ValidationError
errors.append(
ValidationError(
code="INVALID_TRANSITION_TARGET",
message=str(e),
state=state,
)
)
continue
# Double-check each target exists
for target in transitions:
if target not in all_states:
errors.append(
ValidationError(
code="INVALID_TRANSITION_TARGET",
message=(
f"Transition target '{target}' does not exist"
),
state=state,
)
)
return errors
def validate_terminal_states(self) -> List[ValidationError]:
"""
Ensure terminal states have no outgoing transitions.
Returns:
List of validation errors
"""
errors = []
all_states = self.builder.get_all_states()
for state in all_states:
if self.builder.is_terminal_state(state):
transitions = self.builder.extract_valid_transitions(state)
if transitions:
errors.append(
ValidationError(
code="TERMINAL_STATE_HAS_TRANSITIONS",
message=(
f"Terminal state has {len(transitions)} "
f"outgoing transitions: {', '.join(transitions)}"
),
state=state,
)
)
return errors
def validate_permission_consistency(self) -> List[ValidationError]:
"""
Check permission requirements are consistent.
Returns:
List of validation errors
"""
errors = []
all_states = self.builder.get_all_states()
for state in all_states:
perms = self.builder.extract_permission_requirements(state)
# Check for contradictory permissions
if (
perms.get("requires_admin_approval")
and not perms.get("requires_moderator")
):
errors.append(
ValidationError(
code="PERMISSION_INCONSISTENCY",
message=(
"State requires admin approval but not moderator "
"(admin should imply moderator)"
),
state=state,
)
)
return errors
def validate_no_cycles(self) -> List[ValidationError]:
"""
Detect invalid state cycles (excluding self-loops).
Returns:
List of validation errors
"""
errors = []
graph = self.builder.build_transition_graph()
# Check for self-loops (state transitioning to itself)
for state, targets in graph.items():
if state in targets:
# Self-loops are warnings, not errors
# but we can flag them
pass
# Detect cycles using DFS
visited: Set[str] = set()
rec_stack: Set[str] = set()
def has_cycle(node: str, path: List[str]) -> Optional[List[str]]:
visited.add(node)
rec_stack.add(node)
path.append(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
cycle = has_cycle(neighbor, path.copy())
if cycle:
return cycle
elif neighbor in rec_stack:
# Found a cycle
cycle_start = path.index(neighbor)
return path[cycle_start:] + [neighbor]
rec_stack.remove(node)
return None
for state in graph:
if state not in visited:
cycle = has_cycle(state, [])
if cycle:
errors.append(
ValidationError(
code="STATE_CYCLE_DETECTED",
message=(
f"Cycle detected: {' -> '.join(cycle)}"
),
state=cycle[0],
)
)
break # Report first cycle only
return errors
def validate_reachability(self) -> List[ValidationError]:
"""
Ensure all states are reachable from initial states.
Returns:
List of validation errors
"""
errors = []
graph = self.builder.build_transition_graph()
all_states = set(self.builder.get_all_states())
# Find states with no incoming transitions (potential initial states)
incoming: Dict[str, List[str]] = {state: [] for state in all_states}
for source, targets in graph.items():
for target in targets:
incoming[target].append(source)
initial_states = [
state for state in all_states if not incoming[state]
]
if not initial_states:
errors.append(
ValidationError(
code="NO_INITIAL_STATE",
message="No initial state found (no state without incoming)",
)
)
return errors
# BFS from initial states to find reachable states
reachable: Set[str] = set(initial_states)
queue = list(initial_states)
while queue:
current = queue.pop(0)
for target in graph.get(current, []):
if target not in reachable:
reachable.add(target)
queue.append(target)
# Check for unreachable states
unreachable = all_states - reachable
for state in unreachable:
# Terminal states might be unreachable if they're end states
if not self.builder.is_terminal_state(state):
errors.append(
ValidationError(
code="UNREACHABLE_STATE",
message="State is not reachable from initial states",
state=state,
)
)
return errors
def generate_validation_report(self) -> str:
"""
Create human-readable validation report.
Returns:
Formatted validation report
"""
result = self.validate_choice_group()
lines = []
lines.append(
f"Validation Report for {self.domain}.{self.choice_group}"
)
lines.append("=" * 60)
lines.append(f"Status: {'VALID' if result.is_valid else 'INVALID'}")
lines.append(f"Errors: {len(result.errors)}")
lines.append(f"Warnings: {len(result.warnings)}")
lines.append("")
if result.errors:
lines.append("ERRORS:")
lines.append("-" * 60)
for error in result.errors:
lines.append(f" {error}")
lines.append("")
if result.warnings:
lines.append("WARNINGS:")
lines.append("-" * 60)
for warning in result.warnings:
lines.append(f" {warning}")
lines.append("")
return "\n".join(lines)
def validate_on_registration(choice_group: str, domain: str = "core") -> bool:
"""
Validate choice group when registering.
Args:
choice_group: Choice group name
domain: Domain namespace
Returns:
True if validation passes
Raises:
ValueError: If validation fails
"""
validator = MetadataValidator(choice_group, domain)
result = validator.validate_choice_group()
if not result.is_valid:
error_messages = [str(e) for e in result.errors]
raise ValueError(
f"Validation failed for {domain}.{choice_group}:\n"
+ "\n".join(error_messages)
)
return True
__all__ = [
"MetadataValidator",
"ValidationResult",
"ValidationError",
"ValidationWarning",
"validate_on_registration",
]

View File

@@ -1,10 +1,14 @@
"""
Core views for the application.
"""
from typing import Any, Dict, Optional, Type
from django.shortcuts import redirect
from django.urls import reverse
from django.views.generic import DetailView
from django.views import View
from django.http import HttpRequest, HttpResponse
from django.db.models import Model
from django.http import HttpRequest, HttpResponse
from django.shortcuts import redirect, render
from django.urls import reverse
from django.views import View
from django.views.generic import DetailView, TemplateView
class SlugRedirectMixin(View):
@@ -37,10 +41,8 @@ class SlugRedirectMixin(View):
reverse(url_pattern, kwargs=reverse_kwargs), permanent=True
)
return super().dispatch(request, *args, **kwargs)
except (AttributeError, Exception) as e: # type: ignore
if self.model and hasattr(self.model, "DoesNotExist"):
if isinstance(e, self.model.DoesNotExist): # type: ignore
return super().dispatch(request, *args, **kwargs)
except Exception: # pylint: disable=broad-exception-caught
# Fallback to default dispatch on any error (e.g. object not found)
return super().dispatch(request, *args, **kwargs)
def get_redirect_url_pattern(self) -> str:
@@ -62,10 +64,6 @@ class SlugRedirectMixin(View):
return {self.slug_url_kwarg: getattr(self.object, "slug", "")}
from django.views.generic import TemplateView
from django.shortcuts import render
class GlobalSearchView(TemplateView):
"""Unified search view with HTMX support for debounced results and suggestions."""
@@ -75,17 +73,21 @@ class GlobalSearchView(TemplateView):
q = request.GET.get("q", "")
results = []
suggestions = []
# Lightweight placeholder search: real implementation should query multiple models
# Lightweight placeholder search.
# Real implementation should query multiple models.
if q:
# Return a small payload of mocked results to keep this scaffold safe
results = [{"title": f"Result for {q}", "url": "#", "subtitle": "Park"}]
results = [
{"title": f"Result for {q}", "url": "#", "subtitle": "Park"}
]
suggestions = [{"text": q, "url": "#"}]
context = {"results": results, "suggestions": suggestions}
# If HTMX request, render dropdown partial
if request.headers.get("HX-Request") == "true":
return render(request, "core/search/partials/search_dropdown.html", context)
return render(
request, "core/search/partials/search_dropdown.html", context
)
return render(request, self.template_name, context)