message_flow/sagas/orchestration_simple_dsl/local_step.py (38 lines of code) (raw):
from typing import Any, Callable, Generic, Optional, TypeVar
from ...commands.common import CommandReplyOutcome, ReplyMessageHeaders
from ...messaging.common import IMessage
from .reply_handler import ReplyHandler
from .saga_step import SagaStep
from .step_outcome import IStepOutcome, StepOutcome
__all__ = ["LocalStep"]
Data = TypeVar("Data")
class LocalStep(Generic[Data], SagaStep[Data]):
def __init__(
self,
local_function: Callable[[Data], None],
compensation: Optional[Callable[[Data], None]] = None,
) -> None:
self._local_function = local_function
self._compensation = compensation
def has_action(self, data: Data) -> bool:
return True
def has_compensation(self, data: Data) -> bool:
return self._compensation is not None
def is_successful_reply(self, compensating: bool, message: IMessage) -> bool:
return CommandReplyOutcome.SUCCESS.value == message.get_required_header(
ReplyMessageHeaders.REPLY_OUTCOME
)
def get_reply_handler(
self, message: IMessage, compensating: bool
) -> Optional[ReplyHandler]:
return None
def make_step_outcome(self, data: Data, compensating: bool) -> IStepOutcome:
try:
if compensating:
if self.has_compensation(data):
self._compensation(data) # type: ignore
else:
self._local_function(data)
return StepOutcome.make_local_outcome()
except RuntimeError as error:
return StepOutcome.make_local_outcome(error)