message_flow/sagas/orchestration_simple_dsl/reply_handler.py (15 lines of code) (raw):

from typing import Callable, Generic, Type, TypeVar __all__ = ["ReplyHandler"] T = TypeVar("T") Data = TypeVar("Data") class ReplyHandler(Generic[Data, T]): def __init__( self, reply_class: Type[T], reply_handler: Callable[[Data, T], None] ) -> None: self._reply_class = reply_class self._reply_handler = reply_handler def __call__(self, data: Data, reply: T) -> None: self._reply_handler(data, reply) @property def reply_class(self) -> Type[T]: return self._reply_class