message_flow/commands/consumer/command_handlers_builder.py (25 lines of code) (raw):

from typing import Callable, List, Optional, Type from message_flow.commands.common import C from message_flow.commands.consumer.command_handler import CommandHandler, Handler from message_flow.commands.consumer.command_handlers import CommandHandlers class CommandHandlersBuilder: def __init__(self, channel: str) -> None: self._channel = channel self._queue: str = "" self._handlers: List[CommandHandler] = [] @classmethod def from_channel(cls, channel: str) -> "CommandHandlersBuilder": return cls(channel) def and_from_channel(self, channel: str) -> "CommandHandlersBuilder": self._channel = channel return self def for_queue(self, queue: str) -> "CommandHandlersBuilder": self._queue = queue return self def on_message( self, command_class: Type[C], handler: Handler ) -> "CommandHandlersBuilder": self._handlers.append(CommandHandler[C](self._channel, command_class, handler)) return self def build(self) -> CommandHandlers: return CommandHandlers(self._handlers, queue=self._queue)