in modular_sdk/connections/rabbit_connection.py [0:0]
def consume_sync(self, queue: str, correlation_id: str) -> bytes | None:
def _consumer_callback(
ch: pika.adapters.blocking_connection.BlockingChannel,
method: pika.spec.Basic.Deliver,
props: pika.spec.BasicProperties,
body: bytes,
) -> None:
if props.correlation_id == correlation_id:
_LOG.debug(
f'Message retrieved successfully with ID: '
f'{props.correlation_id}'
)
self.responses[props.correlation_id] = body
ch.basic_ack(delivery_tag=method.delivery_tag)
ch.stop_consuming()
else:
_LOG.warning(
f'Received message with mismatched Correlation ID:'
f'{props.correlation_id} (expected: {correlation_id})'
)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def _close_on_timeout():
_LOG.warning('Timeout exceeded. Close connection')
self._close()
channel = self._open_channel()
if channel.basic_consume(
queue=queue,
on_message_callback=_consumer_callback,
consumer_tag=correlation_id,
):
_LOG.debug(
f'Waiting for message. Queue: {queue}, '
f'Correlation id: {correlation_id}'
)
else:
_LOG.error(f"Failed to consume message from queue '{queue}'")
return None
self.conn.call_later(self.timeout, _close_on_timeout)
# blocking method
channel.start_consuming()
self._close()
response = self.responses.pop(correlation_id, None)
if response:
_LOG.debug('Response successfully received and processed')
return response
_LOG.error(f"Response wasn't received. Timeout: {self.timeout} seconds")
return None