modular_sdk/services/impl/maestro_rabbit_transport_service.py (102 lines of code) (raw):
import binascii
import json
from typing import Any
from modular_sdk.commons import (
ModularException, generate_id, build_secure_message, build_message,
)
from modular_sdk.commons.constants import (
PLAIN_CONTENT_TYPE,
SUCCESS_STATUS,
ERROR_STATUS,
RESULTS,
DATA
) # todo remove these imports with major release. They can be used from outside
from modular_sdk.commons.log_helper import get_logger
from modular_sdk.connections.rabbit_connection import RabbitMqConnection
from modular_sdk.services.impl.maestro_signature_builder import (
MaestroSignatureBuilder,
)
from modular_sdk.services.rabbit_transport_service import (
RabbitMQTransport, RabbitConfig,
)
_LOG = get_logger(__name__)
class MaestroRabbitConfig(RabbitConfig):
def __init__(self, request_queue: str, response_queue: str,
rabbit_exchange: str,
sdk_access_key: str, sdk_secret_key: str, maestro_user: str):
super(MaestroRabbitConfig, self).__init__(
request_queue=request_queue,
response_queue=response_queue,
rabbit_exchange=rabbit_exchange
)
self.sdk_access_key = sdk_access_key
self.sdk_secret_key = sdk_secret_key
self.maestro_user = maestro_user
class MaestroRabbitMQTransport(RabbitMQTransport):
def __init__(self, rabbit_connection: RabbitMqConnection,
config: MaestroRabbitConfig):
super().__init__(
rabbit_connection=rabbit_connection,
config=config
)
self.access_key = config.sdk_access_key
self.secret_key = config.sdk_secret_key
self.user = config.maestro_user
def pre_process_request(self, command_name, parameters, secure_parameters,
is_flat_request, async_request, compressed=False,
config=None) -> tuple[str | bytes, dict]:
request_id = generate_id()
_LOG.debug('Going to pre-process request')
message = build_message(
command_name=command_name,
parameters=parameters,
request_id=request_id,
is_flat_request=is_flat_request,
compressed=compressed
)
secure_message = message
if not compressed:
secure_message = build_secure_message(
command_name=command_name,
parameters_to_secure=parameters,
secure_parameters=secure_parameters,
request_id=request_id,
is_flat_request=is_flat_request
)
signer = MaestroSignatureBuilder(
access_key=config.sdk_access_key if config and config.sdk_access_key else self.access_key,
secret_key=config.sdk_secret_key if config and config.sdk_secret_key else self.secret_key,
user=config.maestro_user if config and config.maestro_user else self.user,
)
encrypted_body = signer.encrypt(data=message)
_LOG.debug('Message encrypted')
# sign headers
headers = signer.get_signed_headers(
async_request=async_request,
compressed=compressed
)
_LOG.debug('Signed headers prepared')
return encrypted_body, headers
def post_process_request(self, response: bytes) -> tuple[int, str, Any]:
# TODO post process does not accept config whereas pre process accepts
signer = MaestroSignatureBuilder(
access_key=self.access_key,
secret_key=self.secret_key,
user=self.user
)
try:
response_item = signer.decrypt(data=response)
_LOG.debug('Message from M3-server successfully decrypted')
except binascii.Error:
response_item = response.decode('utf-8')
try:
_LOG.debug('Received and decrypted message from server')
response_json = json.loads(response_item).get('results')[0]
except json.decoder.JSONDecodeError:
_LOG.error('Response can not be decoded - invalid Json string')
raise ModularException(code=502, content="Response can't be decoded")
status = response_json.get('status')
code = response_json.get('statusCode')
if status == SUCCESS_STATUS:
data = response_json.get('data')
else:
data = response_json.get('readableError') or response_json.get('error')
return code, status, data