in modular_sdk/services/rabbit_transport_service.py [0:0]
def send_sync(self, *args, **kwargs) -> tuple[int, str, Any]:
message, headers = self.pre_process_request(*args, **kwargs)
rabbit_config = kwargs.get('config')
request_queue, exchange, response_queue = \
self.__resolve_rabbit_options(
exchange=rabbit_config.rabbit_exchange if rabbit_config else None,
request_queue=rabbit_config.request_queue if rabbit_config else None,
response_queue=rabbit_config.response_queue if rabbit_config else None
)
correlation_id = generate_id_hex()
self.rabbit.publish_sync(routing_key=request_queue,
exchange=exchange,
callback_queue=response_queue,
correlation_id=correlation_id,
message=message,
headers=headers,
content_type=PLAIN_CONTENT_TYPE)
try:
response_item = self.rabbit.consume_sync(
queue=response_queue, correlation_id=correlation_id,
)
except exceptions.ConnectionWrongStateError as e:
raise ModularException(code=502, content=str(e))
if not response_item:
raise ModularException(
code=502,
content=f'Response was not received. '
f'Timeout: {self.rabbit.timeout} seconds.'
)
return self.post_process_request(response=response_item)