message_flow/sagas/orchestration_simple_dsl/participant_invocation_impl.py (21 lines of code) (raw):
from typing import Any, Callable, Generic, Optional, TypeVar
from ...commands.common import Command, CommandReplyOutcome, ReplyMessageHeaders
from ...commands.consumer import CommandWithDestination
from ...messaging.common import IMessage
from .abstract_participant_invocation import AbstractParticipantInvocation
__all__ = ["ParticipantInvocationImpl"]
Data = TypeVar("Data")
class ParticipantInvocationImpl(AbstractParticipantInvocation[Data], Generic[Data]):
def __init__(
self,
command_builder: Callable[[Data], CommandWithDestination],
an_invocable_predicate: Optional[Callable[[Any], bool]] = None,
) -> None:
super().__init__(an_invocable_predicate)
self._command_builder = command_builder
def is_successful_reply(self, message: IMessage) -> bool:
return CommandReplyOutcome.SUCCESS.value == message.get_required_header(
ReplyMessageHeaders.REPLY_OUTCOME
)
def make_command_to_send(self, data: Data) -> CommandWithDestination:
return self._command_builder(data)