message_flow/sagas/orchestration_simple_dsl/step_to_execute.py (20 lines of code) (raw):

from typing import Generic, TypeVar from ..orchestration import SagaActions # type: ignore from .abstract_step_to_execute import AbstractStepToExecute from .saga_execution_state import SagaExecutionState from .saga_step import SagaStep __all__ = ["StepToExecute"] Data = TypeVar("Data") class StepToExecute(AbstractStepToExecute[Data, SagaStep[Data]], Generic[Data]): def __init__(self, step: SagaStep, skipped: int, compensating: bool) -> None: super().__init__(step, skipped, compensating) def execute_step( self, data: Data, current_state: SagaExecutionState ) -> SagaActions[Data]: new_state: SagaExecutionState = current_state.next_state(self._size()) builder = SagaActions.builder() compensating: bool = current_state.compensating self._step.make_step_outcome(data, self._compensating).visit( builder.with_is_local, builder.with_commands ) return self._make_saga_actions(builder, data, new_state, compensating)