message_flow/commands/consumer/command_dispatcher.py (85 lines of code) (raw):
import logging
from typing import List, Optional
from message_flow.commands.common.command_message_headers import CommandMessageHeaders
from message_flow.commands.consumer.command_handler import CommandHandler, CommandResult
from message_flow.commands.consumer.command_handler_params import CommandHandlerParams
from message_flow.commands.consumer.command_handlers import CommandHandlers
from message_flow.commands.consumer.command_message import CommandMessage
from message_flow.events.mappers.interfaces import IDeserializer, ISerializer
from message_flow.events.mappers.json_mapper import JsonMapper
from message_flow.messaging.common.interfaces import IMessage
from message_flow.messaging.consumer import IMessageConsumer
from message_flow.messaging.producer import IMessageProducer
from message_flow.messaging.producer.message_builder import MessageBuilder
_logger = logging.getLogger(__name__)
class CommandDispatcher:
def __init__(
self,
command_handlers: CommandHandlers,
message_consumer: IMessageConsumer,
message_producer: IMessageProducer,
*,
serializer: ISerializer = JsonMapper(),
deserializer: IDeserializer = JsonMapper()
) -> None:
self._command_handlers = command_handlers
self._message_consumer = message_consumer
self._message_producer = message_producer
self._serializer = serializer
self._deserializer = deserializer
@property
def handlers(self) -> List[CommandHandler]:
return self._command_handlers.handlers
def initialize(self) -> None:
self._message_consumer.subscribe(
self._command_handlers.channels,
self.message_handler,
queue=self._command_handlers.queue,
)
def message_handler(self, message: IMessage) -> None:
_logger.debug(
"Got command %s, with payload %s.",
message.get_required_header(CommandMessageHeaders.COMMAND_TYPE),
message.payload,
) # noqa: WPS323
command_handler: Optional[
CommandHandler
] = self._command_handlers.find_target_method(message)
if command_handler is None:
_logger.debug(
"Command %s doesn't have a handler.",
message.get_required_header(CommandMessageHeaders.COMMAND_TYPE),
) # noqa: WPS323
return
command_handler_params = CommandHandlerParams(message)
command_message = CommandMessage(
message.get_id(),
self._deserializer.deserialize(
command_handler.command_class, message.payload
),
command_handler_params.correlation_headers,
message,
)
replies: CommandResult = self._invoke(command_handler, command_message)
if replies is None:
return
self._send_replies(replies, command_handler_params)
_logger.info(
"Command %s, with payload %s processed.",
message.get_required_header(CommandMessageHeaders.COMMAND_TYPE),
message.payload,
) # noqa: WPS323
def _invoke(
self, command_handler: CommandHandler, command_message: CommandMessage
) -> CommandResult:
return command_handler.invoke_method(command_message)
def _send_replies(
self, replies: List[IMessage], command_handler_params: CommandHandlerParams
) -> None:
for reply in replies:
message = (
MessageBuilder.with_message(reply)
.with_extra_headers("", command_handler_params.correlation_headers)
.build()
)
self._message_producer.send(command_handler_params.default_reply_channel, message) # type: ignore