message_flow/sagas/orchestration/saga_actions.py (117 lines of code) (raw):
from typing import Any, Final, Generic, List, Optional, TypeVar
from message_flow.commands.consumer.command_with_destination import (
CommandWithDestination,
)
__all__ = ["SagaActions"]
Data = TypeVar("Data")
class SagaActions(Generic[Data]):
def __init__(
self,
commands: List[CommandWithDestination],
updated_saga_data: Optional[Data],
updated_state: Optional[str],
end_state: bool,
compensating: bool,
failed: bool,
local: bool,
local_exception: Optional[RuntimeError],
) -> None:
self._commands = commands
self._updated_saga_data = updated_saga_data
self._updated_state = updated_state
self._end_state = end_state
self._compensating = compensating
self._local = local
self._local_exception = local_exception
self._failed = failed
@property
def commands(self) -> List[CommandWithDestination]:
return self._commands
@property
def updated_saga_data(self) -> Optional[Data]:
return self._updated_saga_data
@property
def updated_state(self) -> str:
if self._updated_state is None:
raise RuntimeError("No updated state.")
return self._updated_state
@property
def is_end_state(self) -> bool:
return self._end_state
@property
def is_compensating(self) -> bool:
return self._compensating
@property
def is_local(self) -> bool:
return self._local
@property
def is_failed(self) -> bool:
return self._failed
@property
def local_exception(self) -> Optional[RuntimeError]:
return self._local_exception
class Builder:
def __init__(self) -> None:
self._commands: List[CommandWithDestination] = []
self._updated_saga_data: Optional[Any] = None
self._updated_state: Optional[str] = None
self._end_state: bool = False
self._compensating: bool = False
self._local: bool = False
self._failed: bool = False
self._local_exception: Optional[RuntimeError] = None
def build(self) -> "SagaActions[Any]":
return SagaActions(
self._commands,
self._updated_saga_data,
self._updated_state,
self._end_state,
self._compensating,
self._failed,
self._local,
self._local_exception,
)
def with_command(
self, command: CommandWithDestination
) -> "SagaActions.Builder":
self._commands.append(command)
return self
def with_updated_saga_data(self, data: Data) -> "SagaActions.Builder":
self._updated_saga_data = data
return self
def with_updated_state(self, state: str) -> "SagaActions.Builder":
self._updated_state = state
return self
def with_commands(
self, commands: List[CommandWithDestination]
) -> "SagaActions.Builder":
self._commands.extend(commands)
return self
def with_is_end_state(self, end_state: bool) -> "SagaActions.Builder":
self._end_state = end_state
return self
def with_is_failed(self, failed: bool) -> "SagaActions.Builder":
self._failed = failed
return self
def with_is_compensating(self, compensating: bool) -> "SagaActions.Builder":
self._compensating = compensating
return self
def with_is_local(
self, local_exception: Optional[RuntimeError]
) -> "SagaActions.Builder":
self._local = True
self._local_exception = local_exception
return self
def build_actions(
self, data: Data, compensating: bool, state: str, end_state: bool
) -> "SagaActions[Data]":
return (
self.with_updated_saga_data(data)
.with_updated_state(state)
.with_is_end_state(end_state)
.with_is_compensating(compensating)
.build()
)
@classmethod
def builder(cls) -> "Builder":
return cls.Builder()