message_flow/sagas/orchestration_simple_dsl/participant_invocation_step.py (51 lines of code) (raw):

from typing import Any, Callable, Dict, Generic, Optional, TypeVar from ...commands.common import ReplyMessageHeaders from ...messaging.common import IMessage from .participant_invocation import ParticipantInvocation from .reply_handler import ReplyHandler from .step_outcome import IStepOutcome, StepOutcome __all__ = ["ParticipantInvocationStep"] Data = TypeVar("Data") class ParticipantInvocationStep(Generic[Data]): def __init__( self, participant_invocation: Optional[ParticipantInvocation[Data]], compensation: Optional[ParticipantInvocation[Data]], action_reply_handlers: Dict[str, ReplyHandler], compensation_reply_handlers: Dict[str, ReplyHandler], ) -> None: self._action_reply_handlers = action_reply_handlers self._compensation_reply_handlers = compensation_reply_handlers self._participant_invocation = participant_invocation self._compensation = compensation def _get_participant_invocation( self, compensating: bool ) -> Optional[ParticipantInvocation[Data]]: return self._compensation if compensating else self._participant_invocation def is_successful_reply(self, compensating: bool, message: IMessage) -> bool: if (pi := self._get_participant_invocation(compensating)) is not None: return pi.is_successful_reply(message) raise RuntimeError("No invocation for the step.") def has_action(self, data: Data) -> bool: return ( self._participant_invocation is not None and self._participant_invocation.is_invocable(data) ) def has_compensation(self, data: Data) -> bool: return self._compensation is not None and self._compensation.is_invocable(data) def get_reply_handler( self, message: IMessage, compensating: bool ) -> Optional[ReplyHandler]: reply_type: str = message.get_required_header(ReplyMessageHeaders.REPLY_TYPE) return ( self._compensation_reply_handlers if compensating else self._action_reply_handlers ).get(reply_type) def make_step_outcome(self, data: Data, compensating: bool) -> IStepOutcome: pi = self._get_participant_invocation(compensating) if pi is not None: commands_to_send = [pi.make_command_to_send(data)] else: commands_to_send = [] return StepOutcome.make_remote_outcome(commands_to_send)