in message_flow/events/subscriber/domain_event_dispatcher.py [0:0]
def message_handler(self, message: IMessage) -> None:
aggregate_type: str = message.get_required_header(
EventMessageHeaders.AGGREGATE_TYPE
)
_logger.debug(
"Got event %s, with payload %s for aggregate %s.",
message.get_required_header(EventMessageHeaders.EVENT_TYPE),
message.payload,
aggregate_type,
) # noqa: WPS323
handler: Optional[
DomainEventHandler
] = self._domain_event_handlers.find_target_method(message)
if handler is None:
_logger.debug(
"Event %s, for aggregate %s doesn't have a handler.",
message.get_required_header(EventMessageHeaders.EVENT_TYPE),
aggregate_type,
) # noqa: WPS323
return
event = self._deserializer.deserialize(handler.event_class, message.payload)
handler.invoke(
DomainEventEnvelope(
message,
aggregate_type,
message.get_required_header(EventMessageHeaders.AGGREGATE_ID),
message.get_required_header(IMessage.ID),
event,
)
)
_logger.info(
"Event %s, with payload %s, for aggregate %s processed.",
message.get_required_header(EventMessageHeaders.EVENT_TYPE),
message.payload,
aggregate_type,
) # noqa: WPS323