def _extract_routing_info()

in message_flow/sagas/participant/plugin_reply_builder.py [0:0]


    def _extract_routing_info(self) -> Tuple[Dict[str, Any], str]:
        headers = self._command_message.message.headers
        if (routing_info := headers.get("routing_info")) is None:
            raise RuntimeError("Please provide routing info in the message.")

        destination = routing_info.get(
            CommandMessageHeaders.in_reply(CommandMessageHeaders.REPLY_TO)
        )

        return routing_info, destination