message_flow/sagas/orchestration_simple_dsl/isaga_step.py (11 lines of code) (raw):

from typing import Protocol, TypeVar from ...messaging.common.interfaces import IMessage Data = TypeVar("Data", contravariant=True) __all__ = ["ISagaStep"] class ISagaStep(Protocol[Data]): def is_successful_reply(self, compensating: bool, message: IMessage) -> bool: ... def has_action(self, data: Data) -> bool: ... def has_compensation(self, data: Data) -> bool: ...