modular_sdk/services/impl/maestro_signature_builder.py (75 lines of code) (raw):

import base64 import hashlib import hmac import json import os import time from modular_sdk.commons.constants import PLAIN_CONTENT_TYPE class MaestroSignatureBuilder: __slots__ = '_access_key', '_secret_key', '_user' def __init__(self, access_key: str, secret_key: str, user: str): self._access_key = access_key self._secret_key = secret_key self._user = user def decrypt(self, data: bytes | str) -> bytes: """ Decode received message from Base64 format, cut initialization vector ("iv") from beginning of the message, decrypt message """ from cryptography.hazmat.primitives.ciphers import ( Cipher, algorithms, modes, ) decoded_data = base64.b64decode(data) iv = decoded_data[:12] encrypted_data = decoded_data[12:] cipher = Cipher( algorithms.AES(key=self._secret_key.encode('utf-8')), modes.GCM(initialization_vector=iv) ).decryptor() origin_data_with_iv = cipher.update(encrypted_data) # Due to Initialization vector in encrypting method # there is need to split useful and useless parts of the # server response. response = origin_data_with_iv[:-16] return response def encrypt(self, data: str | dict | list) -> bytes: """ Encrypt data, add initialization vector ("iv") at beginning of encrypted message and encode entire data in Base64 format """ from cryptography.hazmat.primitives.ciphers.aead import AESGCM iv = os.urandom(12) plain_text = data if isinstance(data, str) else json.dumps( data, separators=(',', ':') ) data_in_bytes = plain_text.encode('utf-8') try: cipher = AESGCM(key=self._secret_key.encode('utf-8')) except ValueError as e: raise ValueError(str(e).replace('AESGCM key', 'Secret Key')) encrypted_data = cipher.encrypt( nonce=iv, data=data_in_bytes, associated_data=None) encrypted_data_with_iv = bytes(iv) + encrypted_data return base64.b64encode(encrypted_data_with_iv) def get_signed_headers(self, async_request: bool = False, compressed: bool = False) -> dict: """ Create and sign necessary headers for interaction with Maestro API """ date = int(time.time() * 1000) signature = hmac.new( key=bytearray(f'{self._secret_key}{date}'.encode('utf-8')), msg=bytearray( f'M3-POST:{self._access_key}:{date}:{self._user}'.encode( 'utf-8') ), digestmod=hashlib.sha256 ).hexdigest() n = 2 resolved_signature = '' for each in [signature[i:i + n] for i in range(0, len(signature), n)]: resolved_signature += '1' + each return { "maestro-authentication": resolved_signature, "maestro-request-identifier": "api-server", "maestro-user-identifier": self._user, "maestro-date": str(date), "maestro-accesskey": str(self._access_key), "maestro-sdk-version": "3.2.80", "maestro-sdk-async": 'true' if async_request else 'false', "compressed": True if compressed else False, } def get_http_signed_headers(self, async_request: bool = False, compressed: bool = False) -> dict: base = self.get_signed_headers(async_request=async_request, compressed=compressed) base['compressed'] = 'true' if base['compressed'] else 'false' base['Content-Type'] = PLAIN_CONTENT_TYPE return base