docker/services/key_management_service.py (142 lines of code) (raw):

from services.clients.abstract_key_management import \ AbstractKeyManagementClient, IKey, KEY_TYPE_ATTR, KEY_STD_ATTR, \ HASH_TYPE_ATTR, HASH_STD_ATTR, SIG_SCHEME_ATTR from commons.constants import KEY_ID_ATTR, ALGORITHM_ATTR, VALUE_ATTR, \ B64ENCODED_ATTR from commons.log_helper import get_logger from base64 import standard_b64encode from typing import Optional PUBLIC_KEY_ATTR = 'puk' PRIVATE_KEY_ATTR = 'prk' ALG_ATTR = 'alg' KID_ATTR = 'kid' FORMAT_ATTR = 'format' _LOG = get_logger(__name__) class ManagedKey: def __init__(self, kid: str, alg: str, key: IKey): self.kid = kid self.alg = alg self.key = key def export_key(self, frmt: str, base64encode: bool = False): try: value = self.key.export_key(format=frmt) except (TypeError, Exception) as e: _LOG.warning(f'Key:\'{self.kid}\' could not be exported into ' f'{frmt} format, due to: "{e}".') value = None base = { KEY_ID_ATTR: self.kid, ALGORITHM_ATTR: self.alg } pending = {} if value: pending[FORMAT_ATTR] = frmt pending[VALUE_ATTR] = value if pending[VALUE_ATTR] and base64encode: pending[B64ENCODED_ATTR] = True pending[VALUE_ATTR] = standard_b64encode( value if isinstance(value, bytes) else value.encode('utf-8') ) elif pending[VALUE_ATTR] and not base64encode: pending[B64ENCODED_ATTR] = False if pending[VALUE_ATTR] and isinstance(value, bytes): try: pending[VALUE_ATTR] = value.decode() except (TypeError, Exception) as e: _LOG.warning(f'Key:\'{self.kid}\' could not be decoded into' f' a string, due to: "{e}".') pending = {} base.update(pending) return base class KeyPair: def __init__(self, prk: IKey, typ: str, std: str): self.prk: IKey = prk self.puk: IKey = prk.public_key() self.typ = typ self.std = std class KeyManagementService: def __init__( self, key_management_client: AbstractKeyManagementClient ): self._key_management_client = key_management_client def get_key(self, kid: str, alg: str) -> Optional[ManagedKey]: _alg = alg alg = self._key_management_client.dissect_alg(alg=alg) if not alg: return # Retrieve type and standard data of a key, hash and signature scheme. key_type, key_std = map( alg.get, (KEY_TYPE_ATTR, KEY_STD_ATTR) ) data = self._key_management_client.get_key_data( key_id=kid ) if not data: return key = self._key_management_client.get_key( key_type=key_type, key_std=key_std, key_data=data ) if key: return self.instantiate_managed_key(kid=kid, alg=_alg, key=key) def import_key(self, alg: str, key_value: str) -> Optional[IKey]: alg = self._key_management_client.dissect_alg(alg=alg) if not alg: return # Retrieve type and standard data of a key, hash and signature scheme. key_type, key_std, hash_type, hash_std, sig_scheme = map( alg.get, ( KEY_TYPE_ATTR, KEY_STD_ATTR, HASH_TYPE_ATTR, HASH_STD_ATTR, SIG_SCHEME_ATTR ) ) if not self._key_management_client.is_signature_scheme_accessible( sig_scheme=sig_scheme, key_type=key_type, key_std=key_std, hash_type=hash_type, hash_std=hash_std ): return return self._key_management_client.construct( key_type=key_type, key_std=key_std, key_value=key_value ) def import_key_pair(self, alg: str, private_key: str) -> Optional[KeyPair]: alg = self._key_management_client.dissect_alg(alg=alg) if not alg: return # Retrieve type and standard data of a key, hash and signature scheme. key_type, key_std, hash_type, hash_std, sig_scheme = map( alg.get, ( KEY_TYPE_ATTR, KEY_STD_ATTR, HASH_TYPE_ATTR, HASH_STD_ATTR, SIG_SCHEME_ATTR ) ) if not self._key_management_client.is_signature_scheme_accessible( sig_scheme=sig_scheme, key_type=key_type, key_std=key_std, hash_type=hash_type, hash_std=hash_std ): return prk = self._key_management_client.construct( key_type=key_type, key_std=key_std, key_value=private_key ) if prk: return KeyPair(prk=prk, typ=key_type, std=key_std) def create_key_pair(self, key_type: str, key_std: str) -> \ Optional[KeyPair]: prk = self._key_management_client.generate( key_type=key_type, key_std=key_std ) if not prk: return try: return KeyPair(prk=prk, typ=key_type, std=key_std) except (TypeError, Exception) as e: _LOG.warning(f'KeyPair of {key_type}:{key_std} standard' f' could not be instantiated, due to "{e}".') return def derive_alg( self, key_type: str, key_std: str, hash_type: str, hash_std: str, sig_scheme: str ) -> Optional[str]: if self._key_management_client.is_signature_scheme_accessible( sig_scheme=sig_scheme, hash_type=hash_type, hash_std=hash_std, key_type=key_type, key_std=key_std ): return f'{key_type}:{key_std}_{sig_scheme}_{hash_type}:{hash_std}' @staticmethod def instantiate_managed_key(kid: str, alg: str, key: IKey): return ManagedKey(kid=kid, alg=alg, key=key)