from typing import List, Dict, Any, Optional from django.contrib.auth import get_user_model from django.utils import timezone from .models import ChangeSet User = get_user_model() class ApprovalStage: def __init__(self, stage_id: int, name: str, required_roles: List[str]): self.id = stage_id self.name = name self.required_roles = required_roles self.approvers: List[Dict[str, Any]] = [] self.status = 'pending' # pending, approved, rejected self.completed_at = None def add_approver(self, user: User, decision: str, comment: str = "") -> None: """Add an approver's decision to this stage""" self.approvers.append({ 'user_id': user.id, 'username': user.username, 'decision': decision, 'comment': comment, 'timestamp': timezone.now().isoformat() }) def is_approved(self, policy: str = 'unanimous') -> bool: """Check if stage is approved based on policy""" if not self.approvers: return False approve_count = sum(1 for a in self.approvers if a['decision'] == 'approve') if policy == 'unanimous': return approve_count == len(self.required_roles) else: # majority return approve_count > len(self.required_roles) / 2 class ApprovalStateMachine: """Manages the state transitions for change approval workflow""" def __init__(self, changeset: ChangeSet): self.changeset = changeset self.stages: List[ApprovalStage] = [] self.current_stage_index = 0 self.policy = changeset.approval_policy self._load_state() def _load_state(self) -> None: """Load the current state from changeset approval_state""" if not self.changeset.approval_state: return for stage_data in self.changeset.approval_state: stage = ApprovalStage( stage_data['id'], stage_data['name'], stage_data['required_roles'] ) stage.approvers = stage_data.get('approvers', []) stage.status = stage_data.get('status', 'pending') stage.completed_at = stage_data.get('completed_at') self.stages.append(stage) # Find current stage self.current_stage_index = next( (i for i, s in enumerate(self.stages) if s.status == 'pending'), len(self.stages) - 1 ) def initialize_workflow(self, stages_config: List[Dict[str, Any]]) -> None: """Set up initial approval workflow stages""" self.stages = [ ApprovalStage( i, stage['name'], stage['required_roles'] ) for i, stage in enumerate(stages_config) ] self._save_state() def submit_approval( self, user: User, decision: str, comment: str = "", stage_id: Optional[int] = None ) -> bool: """ Submit an approval decision Args: user: The user submitting approval decision: 'approve' or 'reject' comment: Optional comment stage_id: Optional specific stage ID (for parallel approval) Returns: bool: True if submission was accepted """ if self.changeset.status != 'pending_approval': return False if self.policy == 'sequential': stage = self.stages[self.current_stage_index] else: # parallel if stage_id is None: return False stage = next((s for s in self.stages if s.id == stage_id), None) if not stage: return False # Check if user has required role user_roles = set(user.groups.values_list('name', flat=True)) if not any(role in user_roles for role in stage.required_roles): return False # Add decision stage.add_approver(user, decision, comment) # Update stage status if stage.is_approved(self.policy): stage.status = 'approved' stage.completed_at = timezone.now().isoformat() if self.policy == 'sequential': self._advance_stage() elif decision == 'reject': stage.status = 'rejected' stage.completed_at = timezone.now().isoformat() self.changeset.status = 'rejected' self.changeset.save() self._save_state() return True def _advance_stage(self) -> None: """Move to next stage if available""" if self.current_stage_index < len(self.stages) - 1: self.current_stage_index += 1 else: # All stages approved self.changeset.status = 'approved' self.changeset.save() def _save_state(self) -> None: """Save current state to changeset""" self.changeset.approval_state = [ { 'id': stage.id, 'name': stage.name, 'required_roles': stage.required_roles, 'approvers': stage.approvers, 'status': stage.status, 'completed_at': stage.completed_at } for stage in self.stages ] self.changeset.save() def get_current_stage(self) -> Optional[ApprovalStage]: """Get the current active stage""" if not self.stages: return None return self.stages[self.current_stage_index] def get_stage_by_id(self, stage_id: int) -> Optional[ApprovalStage]: """Get a specific stage by ID""" return next((s for s in self.stages if s.id == stage_id), None) def get_pending_approvers(self) -> List[str]: """Get list of roles that still need to approve the current stage""" current_stage = self.get_current_stage() if not current_stage: return [] approved_by = {a['user_id'] for a in current_stage.approvers} return [ role for role in current_stage.required_roles if not any( user.id in approved_by for user in User.objects.filter(groups__name=role) ) ] def can_user_approve(self, user: User) -> bool: """Check if user can approve the current stage""" current_stage = self.get_current_stage() if not current_stage: return False # Check if user already approved if any(a['user_id'] == user.id for a in current_stage.approvers): return False # Check if user has required role user_roles = set(user.groups.values_list('name', flat=True)) return any(role in user_roles for role in current_stage.required_roles)