message_flow/sagas/orchestration_simple_dsl/with_compensation_builder.py (15 lines of code) (raw):

from typing import Any, Callable, Optional, Protocol, TypeVar from ...commands.common import Command from ...commands.consumer import CommandWithDestination from .invoke_participant_step_builder import InvokeParticipantStepBuilder __all__ = ["WithCompensationBuilder"] Data = TypeVar("Data") C = TypeVar("C", bound=Command) class WithCompensationBuilder(Protocol[Data]): def with_compensation( self, compensation: Callable[[Data], CommandWithDestination], *, compensation_predicate: Optional[Callable[[Any], bool]] = None, ) -> InvokeParticipantStepBuilder[Data]: ...