modular_sdk/services/impl/maestro_http_transport_service.py (141 lines of code) (raw):
import binascii
import json
import urllib.request
import threading
import urllib.parse
import urllib.error
from typing import Any
from modular_sdk.commons import (
ModularException, generate_id, build_secure_message, build_message,
)
from modular_sdk.commons.constants import SUCCESS_STATUS
from modular_sdk.commons.log_helper import get_logger
from modular_sdk.services.impl.maestro_signature_builder import (
MaestroSignatureBuilder,
)
from modular_sdk.services.rabbit_transport_service import AbstractTransport
HTTP_DEFAULT_RESPONSE_TIMEOUT = 30
_LOG = get_logger(__name__)
class MaestroHTTPConfig:
def __init__(
self,
sdk_access_key: str,
sdk_secret_key: str,
maestro_user: str,
):
self.sdk_access_key = sdk_access_key
self.sdk_secret_key = sdk_secret_key
self.maestro_user = maestro_user
class MaestroHTTPTransport(AbstractTransport):
def __init__(
self,
config: MaestroHTTPConfig,
api_link: str,
timeout: int | None = HTTP_DEFAULT_RESPONSE_TIMEOUT,
):
self.access_key = config.sdk_access_key
self.secret_key = config.sdk_secret_key
self.user = config.maestro_user
self.api_link = api_link
self.timeout = timeout or HTTP_DEFAULT_RESPONSE_TIMEOUT
def pre_process_request(self, command_name: str, parameters: list[dict] | dict,
secure_parameters: list | None = None,
is_flat_request: bool = False,
async_request: bool = False,
compressed: bool = False, config=None
) -> tuple[bytes, dict]:
request_id = generate_id()
_LOG.debug('Going to pre-process HTTP request')
message = build_message(
command_name=command_name,
parameters=parameters,
request_id=request_id,
is_flat_request=is_flat_request,
compressed=compressed,
)
secure_message = message
# todo that is strange because why uncompressed data
# should lack parameters?
if not compressed:
secure_message = build_secure_message(
command_name=command_name,
parameters_to_secure=parameters,
secure_parameters=secure_parameters,
request_id=request_id,
is_flat_request=is_flat_request,
)
_LOG.debug(
f'Prepared command: {command_name}\nCommand format: {secure_message}'
)
signer = MaestroSignatureBuilder(
access_key=config.sdk_access_key if config and config.sdk_access_key else self.access_key,
secret_key=config.sdk_secret_key if config and config.sdk_secret_key else self.secret_key,
user=config.maestro_user if config and config.maestro_user else self.user,
)
encrypted_body = signer.encrypt(data=message)
_LOG.debug('Message encrypted')
# sign headers
headers = signer.get_http_signed_headers(
async_request=async_request, compressed=compressed,
)
_LOG.debug('Signed headers prepared')
return encrypted_body, headers
def post_process_request(self, response: bytes) -> tuple[int, str, Any]:
signer = MaestroSignatureBuilder(
access_key=self.access_key,
secret_key=self.secret_key,
user=self.user,
)
try:
response_item = signer.decrypt(data=response)
_LOG.debug('Message from M3-server successfully decrypted')
except binascii.Error:
response_item = response.decode('utf-8')
try:
_LOG.debug(f'Raw decrypted message from server: {response_item}')
response_json = json.loads(response_item).get('results')[0]
except json.decoder.JSONDecodeError:
_LOG.error('Response cannot be decoded - invalid JSON string')
raise ModularException(code=502, content="Response can't be decoded")
status = response_json.get('status')
code = response_json.get('statusCode')
if status == SUCCESS_STATUS:
data = response_json.get('data')
else:
data = response_json.get('readableError') or response_json.get('error')
return code, status, data
def send_sync(self, command_name: str, parameters: list[dict] | dict,
secure_parameters: list | None = None,
is_flat_request: bool = False, async_request: bool = False,
compressed: bool = False, config=None
) -> tuple[int, str, Any]:
_LOG.debug('Making sync http request ')
message, headers = self.pre_process_request(
command_name=command_name,
parameters=parameters,
secure_parameters=secure_parameters,
is_flat_request=is_flat_request,
async_request=async_request,
compressed=compressed,
config=config
)
req = urllib.request.Request(
url=self.api_link, headers=headers, data=message, method='POST',
)
try:
with urllib.request.urlopen(req, timeout=self.timeout) as response:
_LOG.debug(
f'Response status code: {response.getcode()}, reason: {response.reason}')
return self.post_process_request(response.read())
except urllib.error.HTTPError as e:
raise ModularException(code=e.getcode(), content=e.read().decode())
except urllib.error.URLError as e:
_LOG.exception('Cannot make a request')
raise ModularException(code=502,
content='Could not make the request')
def send_async(self, *args, **kwargs) -> None:
message, headers = self.pre_process_request(*args, **kwargs)
req = urllib.request.Request(
url=self.api_link, headers=headers, data=message, method='POST',
)
def _send(r, t):
with urllib.request.urlopen(r, timeout=t) as resp:
_LOG.info('Async request sent. No response will be processed')
threading.Thread(target=_send, args=(req, self.timeout)).start()