message_flow/commands/consumer/command_handlers.py (20 lines of code) (raw):

from typing import List, Optional, Set from message_flow.commands.consumer.command_handler import CommandHandler from message_flow.messaging.common import IMessage class CommandHandlers: def __init__( self, handlers: List[CommandHandler], *, queue: Optional[str] = None ) -> None: self._handlers = handlers self._queue = queue if queue else "" @property def channels(self) -> Set[str]: return {handler.channel for handler in self._handlers} @property def handlers(self) -> List[CommandHandler]: return self._handlers @property def queue(self) -> str: return self._queue def find_target_method(self, message: IMessage) -> Optional[CommandHandler]: return next(filter(lambda h: h.handles(message), self._handlers), None) # type: ignore