message_flow/sagas/orchestration/saga_instance_factory.py (27 lines of code) (raw):
from typing import Collection, Dict, TypeVar
from .saga import Saga
from .saga_instance import SagaInstance
from .saga_manager import SagaManager
from .saga_manager_factory import SagaManagerFactory
__all__ = ["SagaInstanceFactory"]
SagaData = TypeVar("SagaData")
class SagaInstanceFactory:
def __init__(
self, saga_manager_factory: SagaManagerFactory, sagas: Collection[Saga]
) -> None:
self._saga_managers: Dict[Saga, SagaManager] = {}
for saga in sagas:
self._saga_managers[saga] = self._make_saga_manager(
saga_manager_factory, saga
)
def create(self, saga: Saga[SagaData], data: SagaData) -> SagaInstance:
saga_manager = self._saga_managers.get(saga)
if saga_manager is None:
raise RuntimeError(f"No saga manager for {saga}")
return saga_manager.create(data)
def _make_saga_manager(
self, saga_manager_factory: SagaManagerFactory, saga: Saga[SagaData]
) -> SagaManager[SagaData]:
saga_manager = saga_manager_factory.make(saga)
saga_manager.subscribe_to_reply_channel()
return saga_manager