message_flow/sagas/orchestration_simple_dsl/simple_saga_definition.py (60 lines of code) (raw):
from typing import Generic, List, TypeVar
from ...messaging.common import IMessage
from ..orchestration import SagaActions # type: ignore
from .abstract_simple_saga_definition import AbstractSimpleSagaDefinition
from .saga_actions_provider import SagaActionsProvider
from .saga_execution_state import SagaExecutionState
from .saga_execution_state_json_serde import SagaExecutionStateJsonSerde
from .saga_step import SagaStep
from .step_to_execute import StepToExecute
__all__ = ["SimpleSagaDefinition"]
Data = TypeVar("Data")
class SimpleSagaDefinition(
AbstractSimpleSagaDefinition[
Data, SagaStep[Data], StepToExecute[Data], SagaActionsProvider[Data]
],
Generic[Data],
):
def __init__(self, steps: List[SagaStep[Data]]) -> None:
super().__init__(steps)
def start(self, saga_data: Data) -> SagaActions[Data]:
return self._to_saga_actions(self._first_step_to_execute(saga_data))
def handle_reply(
self,
saga_type: str,
saga_id: str,
current_state: str,
saga_data: Data,
message: IMessage,
) -> SagaActions[Data]:
state: SagaExecutionState = SagaExecutionStateJsonSerde.decode_state(
current_state
)
current_step: SagaStep[Data] = self._steps[state.currently_executing]
compensating: bool = state.compensating
reply_handler = current_step.get_reply_handler(message, compensating)
if reply_handler is not None:
self._invoke_reply_handler(message, saga_data, reply_handler)
sap: SagaActionsProvider[Data] = self._saga_actions_for_next_step(
saga_type, saga_id, saga_data, message, state, current_step, compensating
)
return self._to_saga_actions(sap)
def _to_saga_actions(self, sap: SagaActionsProvider[Data]) -> SagaActions[Data]:
return sap.to_saga_actions(lambda x: x, lambda x: x)
def _make_saga_actions_provider(
self, saga_actions: SagaActions[Data]
) -> SagaActionsProvider[Data]:
return SagaActionsProvider[Data].from_actions(saga_actions)
def _make_saga_actions_provider_from_step(
self,
stet_to_execute: StepToExecute[Data],
data: Data,
state: SagaExecutionState,
) -> SagaActionsProvider[Data]:
return SagaActionsProvider[Data].from_actions(
stet_to_execute.execute_step(data, state)
)
def _make_step_to_execute(
self, skipped: int, compensating: bool, step: SagaStep[Data]
) -> StepToExecute[Data]:
return StepToExecute[Data](step, skipped, compensating)