message_flow/sagas/participant/plugin_reply_builder.py (51 lines of code) (raw):

from typing import Any, Dict, Tuple from uuid import uuid4 from message_flow.commands.common import ( CommandMessageHeaders, CommandReplyOutcome, Failure, ReplyMessageHeaders, Success, ) from message_flow.commands.consumer.command_message import CommandMessage from message_flow.events.mappers import JsonMapper from message_flow.messaging.common import IMessage from message_flow.messaging.producer import MessageBuilder __all__ = ["PluginReplyBuilder"] class PluginReplyBuilder: def __init__(self, command_message: CommandMessage) -> None: self._command_message = command_message self._routing_info, self._destination = self._extract_routing_info() @classmethod def for_command(cls, command_message: CommandMessage) -> "PluginReplyBuilder": return cls(command_message) def with_success(self, reply: Any = Success()) -> Tuple[str, IMessage]: return self._destination, ( MessageBuilder.with_payload(JsonMapper().serialize(reply)) .with_header( ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.SUCCESS.value ) .with_header(ReplyMessageHeaders.REPLY_TYPE, reply.__class__.__name__) .with_header(IMessage.ID, uuid4().hex) .with_extra_headers("", self._routing_info) .build() ) def with_failure(self, reply: Any = Failure()) -> Tuple[str, IMessage]: return self._destination, ( MessageBuilder.with_payload(JsonMapper().serialize(reply)) .with_header( ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.FAILURE.value ) .with_header(ReplyMessageHeaders.REPLY_TYPE, reply.__class__.__name__) .with_header(IMessage.ID, uuid4().hex) .with_extra_headers("", self._routing_info) .build() ) def _extract_routing_info(self) -> Tuple[Dict[str, Any], str]: headers = self._command_message.message.headers if (routing_info := headers.get("routing_info")) is None: raise RuntimeError("Please provide routing info in the message.") destination = routing_info.get( CommandMessageHeaders.in_reply(CommandMessageHeaders.REPLY_TO) ) return routing_info, destination