def consume_sync()

in modular_sdk/connections/rabbit_connection.py [0:0]


    def consume_sync(self, queue: str, correlation_id: str) -> bytes | None:

        def _consumer_callback(
                ch: pika.adapters.blocking_connection.BlockingChannel,
                method: pika.spec.Basic.Deliver,
                props: pika.spec.BasicProperties,
                body: bytes,
        ) -> None:
            if props.correlation_id == correlation_id:
                _LOG.debug(
                    f'Message retrieved successfully with ID: '
                    f'{props.correlation_id}'
                )
                self.responses[props.correlation_id] = body
                ch.basic_ack(delivery_tag=method.delivery_tag)
                ch.stop_consuming()
            else:
                _LOG.warning(
                    f'Received message with mismatched Correlation ID:'
                    f'{props.correlation_id} (expected: {correlation_id})'
                )
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

        def _close_on_timeout():
            _LOG.warning('Timeout exceeded. Close connection')
            self._close()

        channel = self._open_channel()
        if channel.basic_consume(
                queue=queue,
                on_message_callback=_consumer_callback,
                consumer_tag=correlation_id,
        ):
            _LOG.debug(
                f'Waiting for message. Queue: {queue}, '
                f'Correlation id: {correlation_id}'
            )
        else:
            _LOG.error(f"Failed to consume message from queue '{queue}'")
            return None

        self.conn.call_later(self.timeout, _close_on_timeout)

        # blocking method
        channel.start_consuming()
        self._close()

        response = self.responses.pop(correlation_id, None)
        if response:
            _LOG.debug('Response successfully received and processed')
            return response
        _LOG.error(f"Response wasn't received. Timeout: {self.timeout} seconds")
        return None