modular_sdk/services/rabbit_transport_service.py (91 lines of code) (raw):
from abc import abstractmethod
from typing import TYPE_CHECKING, Any
from pika import exceptions
from modular_sdk.commons import ModularException, generate_id_hex
from modular_sdk.commons.constants import (
PLAIN_CONTENT_TYPE,
SUCCESS_STATUS,
ERROR_STATUS,
RESULTS,
DATA
) # todo remove these imports with major release. They can be used from outside
from modular_sdk.commons.log_helper import get_logger
if TYPE_CHECKING:
from modular_sdk.connections.rabbit_connection import RabbitMqConnection
_LOG = get_logger(__name__)
class AbstractTransport:
def send_sync(self, *args, **kwargs) -> tuple[int, str, Any]:
"""
Can raise ModularException
"""
def send_async(self, *args, **kwargs) -> None:
pass
class RabbitConfig:
def __init__(self, request_queue: str, response_queue: str,
rabbit_exchange: str):
self.request_queue = request_queue
self.response_queue = response_queue
self.rabbit_exchange = rabbit_exchange
class RabbitMQTransport(AbstractTransport):
def __init__(self, rabbit_connection: 'RabbitMqConnection',
config: RabbitConfig):
self.rabbit = rabbit_connection
self.request_queue = config.request_queue
self.response_queue = config.response_queue
self.exchange = config.rabbit_exchange
@abstractmethod
def pre_process_request(self, *args, **kwargs) -> tuple[str | bytes, dict]:
"""
Must return tuple that contains message and headers
"""
@abstractmethod
def post_process_request(self, *args, **kwargs) -> tuple[int, str, str]:
"""
Must return a tuple that contains code status and response
"""
def __resolve_rabbit_options(self, exchange, request_queue,
response_queue) -> tuple[str, str, str]:
exchange = exchange or self.exchange
if exchange:
routing_key = ''
else:
routing_key = request_queue or self.request_queue
exchange = ''
response_queue = response_queue if response_queue else self.response_queue
return routing_key, exchange, response_queue
def send_sync(self, *args, **kwargs) -> tuple[int, str, Any]:
message, headers = self.pre_process_request(*args, **kwargs)
rabbit_config = kwargs.get('config')
request_queue, exchange, response_queue = \
self.__resolve_rabbit_options(
exchange=rabbit_config.rabbit_exchange if rabbit_config else None,
request_queue=rabbit_config.request_queue if rabbit_config else None,
response_queue=rabbit_config.response_queue if rabbit_config else None
)
correlation_id = generate_id_hex()
self.rabbit.publish_sync(routing_key=request_queue,
exchange=exchange,
callback_queue=response_queue,
correlation_id=correlation_id,
message=message,
headers=headers,
content_type=PLAIN_CONTENT_TYPE)
try:
response_item = self.rabbit.consume_sync(
queue=response_queue, correlation_id=correlation_id,
)
except exceptions.ConnectionWrongStateError as e:
raise ModularException(code=502, content=str(e))
if not response_item:
raise ModularException(
code=502,
content=f'Response was not received. '
f'Timeout: {self.rabbit.timeout} seconds.'
)
return self.post_process_request(response=response_item)
def send_async(self, *args, **kwargs) -> None:
message, headers = self.pre_process_request(*args, **kwargs)
rabbit_config = kwargs.get('config')
request_queue, exchange, response_queue = \
self.__resolve_rabbit_options(
exchange=rabbit_config.exchange if rabbit_config else None,
request_queue=rabbit_config.request_queue if rabbit_config else None,
response_queue=rabbit_config.response_queue if rabbit_config else None
)
return self.rabbit.publish(
routing_key=request_queue,
exchange=exchange,
message=message,
headers=headers,
content_type=PLAIN_CONTENT_TYPE)