message_flow/sagas/orchestration_simple_dsl/participant_invocation.py (12 lines of code) (raw):

from typing import Protocol, TypeVar from ...commands.consumer import CommandWithDestination from ...messaging.common.interfaces import IMessage __all__ = ["ParticipantInvocation"] Data = TypeVar("Data", contravariant=True) class ParticipantInvocation(Protocol[Data]): def is_successful_reply(self, message: IMessage) -> bool: ... def is_invocable(self, data: Data) -> bool: ... def make_command_to_send(self, data: Data) -> CommandWithDestination: ...