def message_handler()

in message_flow/events/subscriber/domain_event_dispatcher.py [0:0]


    def message_handler(self, message: IMessage) -> None:
        aggregate_type: str = message.get_required_header(
            EventMessageHeaders.AGGREGATE_TYPE
        )
        _logger.debug(
            "Got event %s, with payload %s for aggregate %s.",
            message.get_required_header(EventMessageHeaders.EVENT_TYPE),
            message.payload,
            aggregate_type,
        )  # noqa: WPS323

        handler: Optional[
            DomainEventHandler
        ] = self._domain_event_handlers.find_target_method(message)

        if handler is None:
            _logger.debug(
                "Event %s, for aggregate %s doesn't have a handler.",
                message.get_required_header(EventMessageHeaders.EVENT_TYPE),
                aggregate_type,
            )  # noqa: WPS323
            return

        event = self._deserializer.deserialize(handler.event_class, message.payload)

        handler.invoke(
            DomainEventEnvelope(
                message,
                aggregate_type,
                message.get_required_header(EventMessageHeaders.AGGREGATE_ID),
                message.get_required_header(IMessage.ID),
                event,
            )
        )
        _logger.info(
            "Event %s, with payload %s, for aggregate %s processed.",
            message.get_required_header(EventMessageHeaders.EVENT_TYPE),
            message.payload,
            aggregate_type,
        )  # noqa: WPS323