in message_flow/sagas/participant/plugin_reply_builder.py [0:0]
def _extract_routing_info(self) -> Tuple[Dict[str, Any], str]:
headers = self._command_message.message.headers
if (routing_info := headers.get("routing_info")) is None:
raise RuntimeError("Please provide routing info in the message.")
destination = routing_info.get(
CommandMessageHeaders.in_reply(CommandMessageHeaders.REPLY_TO)
)
return routing_info, destination