message_flow/events/subscriber/domain_event_handler.py (28 lines of code) (raw):

from typing import Callable, Type from message_flow.events.common import DomainEvent, EventMessageHeaders from message_flow.events.subscriber.domain_event_envelope import DomainEventEnvelope from message_flow.messaging.common import IMessage class DomainEventHandler: def __init__( self, aggregate_type: str, event_class: Type[DomainEvent], handler: Callable[[DomainEventEnvelope[DomainEvent]], None], ) -> None: self._aggregate_type = aggregate_type self._event_class = event_class self._handler = handler @property def event_class(self) -> Type[DomainEvent]: return self._event_class @property def aggregate_type(self) -> str: return self._aggregate_type def handles(self, message: IMessage) -> bool: return self._aggregate_type == message.get_required_header( EventMessageHeaders.AGGREGATE_TYPE ) and self._event_class.__name__ == message.get_required_header( EventMessageHeaders.EVENT_TYPE ) def invoke(self, dee: DomainEventEnvelope[DomainEvent]) -> None: self._handler(dee)