message_flow/sagas/orchestration_simple_dsl/saga_step.py (14 lines of code) (raw):

from typing import Any, Callable, Optional, Protocol, TypeVar from ...messaging.common import IMessage from .isaga_step import ISagaStep from .reply_handler import ReplyHandler from .step_outcome import IStepOutcome __all__ = ["SagaStep"] Data = TypeVar("Data", contravariant=True) class SagaStep(ISagaStep[Data], Protocol[Data]): def get_reply_handler( self, message: IMessage, compensating: bool ) -> Optional[ReplyHandler]: ... def make_step_outcome(self, data: Data, compensating: bool) -> IStepOutcome: ...