modular_sdk/connections/rabbit_connection.py (178 lines of code) (raw):

import pika import pika.exceptions from modular_sdk.commons import ModularException from modular_sdk.commons.log_helper import get_logger RABBIT_DEFAULT_RESPONSE_TIMEOUT = 30 _LOG = get_logger(__name__) class RabbitMqConnection: def __init__(self, connection_url: str, timeout: int | None = RABBIT_DEFAULT_RESPONSE_TIMEOUT): self.connection_url = connection_url if timeout is None: # fix of None timeout. There is some code that passes None here self.timeout = RABBIT_DEFAULT_RESPONSE_TIMEOUT else: self.timeout = timeout self.responses = {} def _open_channel(self): try: parameters = pika.URLParameters(self.connection_url) self.conn = pika.BlockingConnection(parameters) return self.conn.channel() except pika.exceptions.AMQPConnectionError as e: error_msg = str(e) or "Bad credentials" _LOG.error(f'Connection to RabbitMQ refused: {error_msg}') raise ModularException( code=502, content=f'Connection to RabbitMQ refused: {error_msg}' ) def _close(self) -> None: try: if self.conn.is_open: self.conn.close() except Exception as e: _LOG.error(f"Failed to close RabbitMQ connection: {e}") def publish( self, message: str, routing_key: str, exchange: str = '', headers: dict | None = None, content_type: str | None = None, ) -> None: _LOG.debug(f'Request queue: {routing_key}') channel = self._open_channel() try: channel.confirm_delivery() if not self.__basic_publish( channel=channel, exchange=exchange, routing_key=routing_key, properties=pika.BasicProperties( headers=headers, content_type=content_type, ), body=message, mandatory=True, ): _LOG.error( f'Message was not sent: routing_key={routing_key}, ' f'exchange={exchange}, content_type={content_type}' ) raise ModularException( code=504, content='Message was not sent. Check RabbitMQ configuration' ) _LOG.info('Message pushed') finally: self._close() @staticmethod def __basic_publish( channel: pika.adapters.blocking_connection.BlockingChannel, **kwargs, ) -> bool: try: channel.basic_publish(**kwargs) return True except (pika.exceptions.NackError, pika.exceptions.UnroutableError): _LOG.exception('Pika exception occurred') return False def publish_sync( self, message: str | bytes, routing_key: str, correlation_id: str, callback_queue: str, exchange: str = '', headers: dict = None, content_type: str = None, ) -> None: _LOG.debug( f'Request queue: {routing_key}; Response queue: {callback_queue}' ) channel = self._open_channel() try: channel.confirm_delivery() properties = pika.BasicProperties( headers=headers, reply_to=callback_queue, correlation_id=correlation_id, content_type=content_type, ) if not self.__basic_publish( channel=channel, exchange=exchange, routing_key=routing_key, properties=properties, body=message, ): error_msg = ( f"Message was not sent: routing_key={routing_key}, " f"correlation_id={correlation_id}, " f"callback_queue={callback_queue}, " f"exchange={exchange}, content_type={content_type}" ) _LOG.error(error_msg) raise ModularException( code=504, content='Message was not sent. Check RabbitMQ configuration' ) _LOG.info('Message pushed') finally: self._close() def consume_sync(self, queue: str, correlation_id: str) -> bytes | None: def _consumer_callback( ch: pika.adapters.blocking_connection.BlockingChannel, method: pika.spec.Basic.Deliver, props: pika.spec.BasicProperties, body: bytes, ) -> None: if props.correlation_id == correlation_id: _LOG.debug( f'Message retrieved successfully with ID: ' f'{props.correlation_id}' ) self.responses[props.correlation_id] = body ch.basic_ack(delivery_tag=method.delivery_tag) ch.stop_consuming() else: _LOG.warning( f'Received message with mismatched Correlation ID:' f'{props.correlation_id} (expected: {correlation_id})' ) ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) def _close_on_timeout(): _LOG.warning('Timeout exceeded. Close connection') self._close() channel = self._open_channel() if channel.basic_consume( queue=queue, on_message_callback=_consumer_callback, consumer_tag=correlation_id, ): _LOG.debug( f'Waiting for message. Queue: {queue}, ' f'Correlation id: {correlation_id}' ) else: _LOG.error(f"Failed to consume message from queue '{queue}'") return None self.conn.call_later(self.timeout, _close_on_timeout) # blocking method channel.start_consuming() self._close() response = self.responses.pop(correlation_id, None) if response: _LOG.debug('Response successfully received and processed') return response _LOG.error(f"Response wasn't received. Timeout: {self.timeout} seconds") return None def check_queue_exists(self, queue_name: str) -> bool: channel = self._open_channel() try: channel.queue_declare(queue=queue_name, durable=True, passive=True) except pika.exceptions.ChannelClosedByBroker as e: if e.reply_code == 404: return False self._close() return True def declare_queue(self, queue_name: str) -> None: channel = self._open_channel() declare_resp = channel.queue_declare(queue=queue_name, durable=True) _LOG.info('Queue declaration response: {0}'.format(declare_resp))