message_flow/events/subscriber/domain_event_handlers_builder.py (35 lines of code) (raw):

from typing import Callable, List, Type from message_flow.events.common import DomainEvent from message_flow.events.subscriber.domain_event_envelope import DomainEventEnvelope, T from message_flow.events.subscriber.domain_event_handler import DomainEventHandler from message_flow.events.subscriber.domain_event_handlers import DomainEventHandlers class DomainEventHandlersBuilder: def __init__(self, aggregate_type: str) -> None: self._aggregate_type = aggregate_type self._queue: str = "" self._handlers: List[DomainEventHandler] = [] @classmethod def for_aggregate_type(cls, aggregate_type: str) -> "DomainEventHandlersBuilder": return cls(aggregate_type) def for_queue(self, queue: str) -> "DomainEventHandlersBuilder": self._queue = queue return self def for_tenant(self, tenant: str) -> "DomainEventHandlersBuilder": self._aggregate_type = ".".join((tenant, self._aggregate_type)) return self def on_event( self, event_class: Type[T], handler: Callable[[DomainEventEnvelope[DomainEvent]], None], ) -> "DomainEventHandlersBuilder": self._handlers.append( DomainEventHandler(self._aggregate_type, event_class, handler) ) return self def and_for_aggregate_type( self, aggregate_type: str ) -> "DomainEventHandlersBuilder": self._aggregate_type = aggregate_type return self def build(self) -> DomainEventHandlers: return DomainEventHandlers(self._handlers, queue=self._queue)