message_flow/sagas/orchestration_simple_dsl/step_outcome.py (40 lines of code) (raw):
import abc
from typing import Callable, List, Optional, Protocol
from ...commands.consumer import CommandWithDestination
__all__ = ["IStepOutcome", "StepOutcome"]
class IStepOutcome(Protocol):
def visit(
self,
local_consumer: Callable[[Optional[RuntimeError]], None],
commands_consumer: Callable[[List[CommandWithDestination]], None],
) -> None:
...
class StepOutcome:
@classmethod
def make_local_outcome(
cls, local_outcome: Optional[RuntimeError] = None
) -> IStepOutcome:
return cls.LocalStepOutcome(local_outcome)
@classmethod
def make_remote_outcome(
cls, commands_to_send: List[CommandWithDestination]
) -> IStepOutcome:
return cls.RemoteStepOutcome(commands_to_send)
class LocalStepOutcome:
def __init__(self, local_outcome: Optional[RuntimeError] = None) -> None:
self._local_outcome = local_outcome
def visit(
self,
local_consumer: Callable[[Optional[RuntimeError]], None],
commands_consumer: Callable[[List[CommandWithDestination]], None],
) -> None:
local_consumer(self._local_outcome)
class RemoteStepOutcome:
def __init__(self, commands_to_send: List[CommandWithDestination]) -> None:
self._commands_to_send = commands_to_send
def visit(
self,
local_consumer: Callable[[Optional[RuntimeError]], None],
commands_consumer: Callable[[List[CommandWithDestination]], None],
) -> None:
commands_consumer(self._commands_to_send)