message_flow/events/subscriber/domain_event_handlers.py (20 lines of code) (raw):
from typing import List, Optional, Set
from message_flow.events.subscriber.domain_event_handler import DomainEventHandler
from message_flow.messaging.common import IMessage
class DomainEventHandlers:
def __init__(
self, handlers: List[DomainEventHandler], *, queue: Optional[str] = None
) -> None:
self._handlers = handlers
self._queue = queue
@property
def handlers(self) -> List[DomainEventHandler]:
return self._handlers
@property
def queue(self) -> Optional[str]:
return self._queue
@property
def aggregate_types(self) -> Set[str]:
return set(map(lambda h: h.aggregate_type, self._handlers))
def find_target_method(self, message: IMessage) -> Optional[DomainEventHandler]:
return next(filter(lambda h: h.handles(message), self._handlers), None) # type: ignore