message_flow/events/subscriber/domain_event_dispatcher.py (66 lines of code) (raw):
import logging
from typing import List, Optional
from message_flow.events.common import EventMessageHeaders
from message_flow.events.mappers import IDeserializer, JsonMapper
from message_flow.events.subscriber.domain_event_envelope import DomainEventEnvelope
from message_flow.events.subscriber.domain_event_handler import DomainEventHandler
from message_flow.events.subscriber.domain_event_handlers import DomainEventHandlers
from message_flow.messaging.common import IMessage
from message_flow.messaging.consumer import IMessageConsumer
_logger = logging.getLogger(__name__)
class DomainEventDispatcher:
def __init__(
self,
domain_event_handlers: DomainEventHandlers,
message_consumer: IMessageConsumer,
*,
deserializer: IDeserializer = JsonMapper()
) -> None:
self._domain_event_handlers = domain_event_handlers
self._message_consumer = message_consumer
self._deserializer = deserializer
@property
def handlers(self) -> List[DomainEventHandler]:
return self._domain_event_handlers.handlers
def initialize(self) -> None:
self._message_consumer.subscribe(
self._domain_event_handlers.aggregate_types,
self.message_handler,
queue=self._domain_event_handlers.queue,
)
def message_handler(self, message: IMessage) -> None:
aggregate_type: str = message.get_required_header(
EventMessageHeaders.AGGREGATE_TYPE
)
_logger.debug(
"Got event %s, with payload %s for aggregate %s.",
message.get_required_header(EventMessageHeaders.EVENT_TYPE),
message.payload,
aggregate_type,
) # noqa: WPS323
handler: Optional[
DomainEventHandler
] = self._domain_event_handlers.find_target_method(message)
if handler is None:
_logger.debug(
"Event %s, for aggregate %s doesn't have a handler.",
message.get_required_header(EventMessageHeaders.EVENT_TYPE),
aggregate_type,
) # noqa: WPS323
return
event = self._deserializer.deserialize(handler.event_class, message.payload)
handler.invoke(
DomainEventEnvelope(
message,
aggregate_type,
message.get_required_header(EventMessageHeaders.AGGREGATE_ID),
message.get_required_header(IMessage.ID),
event,
)
)
_logger.info(
"Event %s, with payload %s, for aggregate %s processed.",
message.get_required_header(EventMessageHeaders.EVENT_TYPE),
message.payload,
aggregate_type,
) # noqa: WPS323