docker/services/clients/standalone_key_management.py (235 lines of code) (raw):
from services.clients.abstract_key_management import \
AbstractKeyManagementClient, IKey, \
KEY_TYPE_ATTR, KEY_STD_ATTR, HASH_TYPE_ATTR, HASH_STD_ATTR, SIG_SCHEME_ATTR
from typing import Union, Callable, Optional, Dict
from services.clients.ssm import SSMClient
from Crypto import Hash
from Crypto.PublicKey import ECC
from Crypto.Signature import DSS
from commons.log_helper import get_logger
ECC_KEY_ATTR = 'ECC'
SHA_HASH_ATTR = 'SHA'
DSS_SIGN_ATTR = 'DSS'
VALUE_ATTR = 'value'
# SHA attributes
TFS_SHA_BIT_MODE = 256
BIT_MODE_ATTR = 'bit_mode'
ATTR_DELIMITER = '_'
_LOG = get_logger(__name__)
# DSS attributes
DSS_MODE_ATTR = 'mode'
DSS_FIPS_MODE = 'fips-186-3'
DSS_RFC6979_MODE = 'deterministic-rfc6979'
KEY_ATTR = 'key'
HASH_ATTR = 'hash'
# Key standard attr(s):
# .Ecc:
ECC_NIST_CURVES = ('p521', 'p384', 'p256', 'p224')
ECDSA_NIST_CURVES = ('p521', 'p384', 'p256', 'p224')
# Hash standard attr(s):
# .Sha:
SHA_BIT_MODES = ('256', '512')
SHA2_MODES = ('256', '512')
class StandaloneKeyManagementClient(AbstractKeyManagementClient):
"""
Provides necessary cryptographic behaviour for the following actions:
- signature verification
- signature production
- key construction
- key generation
- key persistence
adhering to the self-declared algorithm(s), which is bound to the next
format:
`$key-type:$key-standard-label`_`$scheme`_`$hash-type:$hash-standard-label`
Note:
Such extended formatting approach would allow to provide stateless,
verification if required.
"""
def __init__(self, ssm_client: SSMClient):
self._ssm_client = ssm_client
def sign(self, key_id: str, message: Union[str, bytes], algorithm: str,
encoding='utf-8') -> Optional[bytes]:
"""
Mandates signature production computed using a private-key, retrieved
from a manager store, and an algorithm string, segments of which,
split by `_`, explicitly state:
- key-type standard`:`standard-data-label
- signature-scheme standard
- hashing mode`:`standard-data-label
Note: standard-data-label is meant to provide stateless
configuration of standards, denoting labels, which are supported
i.e. key data - ECC:p521.
:parameter key_id: str
:parameter message: Union[str, bytes]
:parameter algorithm: str
:parameter encoding: str
:return: Union[bytes, Type[None]]
"""
is_bytes = isinstance(message, bytes)
message = message if is_bytes else bytes(message, encoding)
_LOG.debug(f'Going to split \'{algorithm}\' algorithm into standards.')
alg: Optional[Dict[str, str]] = self.dissect_alg(alg=algorithm)
if not algorithm:
return
# Retrieve type and standard data of a key, hash and signature scheme.
key_type, key_std, hash_type, hash_std, sig_scheme = map(
alg.get, (
KEY_TYPE_ATTR, KEY_STD_ATTR, HASH_TYPE_ATTR, HASH_STD_ATTR,
SIG_SCHEME_ATTR
)
)
_LOG.debug(f'Checking \'{algorithm}\' signature protocol support.')
if key_type not in self._key_construction_map():
_LOG.warning(f'\'{key_type}\' construction is not supported')
return None
if not self.is_signature_scheme_accessible(
sig_scheme=sig_scheme, key_type=key_type, key_std=key_std,
hash_type=hash_type, hash_std=hash_std
):
_LOG.warning(
f'\'{algorithm}\' signature-protocol is not supported.'
)
return None
_LOG.debug(f'Going to retrieve raw \'{key_id}\' key data.')
key_data: dict = self.get_key_data(key_id=key_id)
if not key_data:
return None
key = self.get_key(
key_type=key_type, key_std=key_std, key_data=key_data
)
if not key:
return None
hash_obj = self._get_hash_client(
message=message, hash_type=hash_type, hash_std=hash_std,
**(key_data.get(hash_type) or {})
)
if not hash_obj:
return None
signer = self._get_signature_client(
key=key, sig_scheme=sig_scheme, **(key_data.get(sig_scheme) or {})
)
if not signer:
return None
return signer.sign(hash_obj)
def generate(self, key_type: str, key_std: str, **data):
"""
Produces a random key, based on a given key-type and respective
standard-label.
:param key_type: str
:param key_std: str
:return: Optional[IKey]
"""
reference = self._key_generation_map()
generator = reference.get(key_type, {}).get(key_std)
if not generator:
_LOG.warning(f'\'{key_type}\':{key_std} generator is not'
f' supported')
return
try:
return generator(key_std, **data)
except (TypeError, Exception) as e:
_LOG.warning(f'\'{key_type}\' generator could not be invoked, '
f'due to: "{e}".')
return
def get_key(self, key_type: str, key_std: str, key_data: dict):
"""
Mediates cryptographic key instance derivation, based on a key_type
and a respective key_data.
:parameter key_type: str
:parameter key_std: str, type-respective standard data label
:parameter key_data: dict, any store-persisted key data
:return: Optional[IKey]
"""
key_value = key_data.pop(VALUE_ATTR)
try:
key = self.construct(
key_type=key_type, key_std=key_std, key_value=key_value,
**key_data
)
except (ValueError, Exception) as e:
_LOG.error(
f'Could not instantiate {key_type}:{key_std} due to "{e}".')
return None
return key
def get_key_data(self, key_id: str) -> Optional[dict]:
"""
Mandates raw cryptographic-key retrieval referencing management store.
:parameter key_id: str
:return: Union[dict, Type[None]]
"""
item = self._ssm_client.get_secret_value(secret_name=key_id)
item = _load_json(item) if isinstance(item, str) else item
is_dict = isinstance(item, dict)
predicate = not is_dict or VALUE_ATTR not in item
if predicate:
header = f'\'{key_id}\' key: {item}'
_LOG.error(f'{header} ' + 'is not a dictionary' if not is_dict
else 'does not contain a \'value\' key.')
return None
return item
@classmethod
def construct(
cls, key_type: str, key_std: str, key_value: str, **data
):
"""
Head cryptographic key construction mediator, which derives a
key type - raw value type constructor, given one has been found.
:parameter key_type: str, cryptographic key-type
:parameter key_std: str, cryptographic key-type standard label
:parameter key_value: str, raw key value
:parameter data: dict, any store-persisted data, related to the key.
:return: Union[object, Type[None]]
"""
mediator_map = cls._key_construction_map()
_map: dict = mediator_map.get(key_type, {})
if not _map:
_LOG.warning(f'No {key_type} key constructor could be found.')
return None
mediator: Callable = _map.get(key_std)
if not mediator:
_LOG.warning(f'{key_type} key does not support {key_std} '
'construction.')
return None
try:
built = mediator(value=key_value, key_std=key_std, **data)
except (ValueError, Exception) as e:
_LOG.warning(f'Key of {key_type}:{key_std} standard '
f'could not be constructed due to: "{e}".')
built = None
return built
@classmethod
def is_signature_scheme_accessible(
cls, sig_scheme: str, key_type: str, key_std: str, hash_type: str,
hash_std: str
):
ref = cls._signature_scheme_reference().get(sig_scheme, {})
return hash_std in ref.get(key_type, {}).get(key_std, {}).get(
hash_type, []
)
@classmethod
def _get_hash_client(
cls, message: bytes, hash_type: str, hash_std: str, **data
):
"""
Mandates message-hash resolution based on provided type and
optional standard data.
:parameter message: bytes
:parameter hash_type: str
:parameter hash_std: str, cryptographic hash-type wise standard label
:parameter data: dict, any store-persisted data, related to the hash.
:return: Type[object, None]
"""
resolver = cls._hash_construction_map().get(hash_type, []).get(
hash_std
)
return resolver(message, hash_std, **data) if resolver else None
@classmethod
def _get_signature_client(cls, key, sig_scheme: str, **data):
"""
Resolves key signature actor based on provided type and optional
standard data.
:parameter key: object
:parameter sig_scheme: str, cryptographic signature scheme label
:parameter data: dict, any persisted data, related to the hash.
:return: Type[object, None]
"""
resolver = cls._signature_construction_map().get(sig_scheme)
return resolver(key, **data) if resolver else None
@classmethod
def _key_construction_map(cls):
"""
Declares a construction key-map, which follows the structure:
{
$key_type: {
$key_std: Callable[value: str, key_std: std, **kwargs]
}
}
:return: Dict[str, Dict[str, Callable]]
"""
reference = {ECC_KEY_ATTR: {}}
for curve in ECC_NIST_CURVES:
reference[ECC_KEY_ATTR][curve] = cls._import_ecc_key
return reference
@classmethod
def _key_generation_map(cls) -> Dict[str, Dict[str, Callable]]:
reference = {ECC_KEY_ATTR: {}}
for curve in ECC_NIST_CURVES:
reference[ECC_KEY_ATTR][curve] = cls._generate_ecc_key
return reference
@classmethod
def _hash_construction_map(cls):
reference = {SHA_HASH_ATTR: {}}
for bit_mode in SHA_BIT_MODES:
reference[SHA_HASH_ATTR][bit_mode] = cls._get_sha
return reference
@classmethod
def _signature_construction_map(cls):
return {
DSS_SIGN_ATTR: cls._get_dss
}
@staticmethod
def _signature_scheme_reference():
"""
Returns a reference map accessible signature schemes, based on the
following structure:
{
$signature_scheme: {
$key_type: {
$key_std: {
$hash_type: Iterable[$hash_standard]
}
}
}
}
:return: Dict[str, Dict[str, Dict[str, Dict[str, Iterable[str]]]]]
"""
# Declares DSS scheme accessible protocols.
dss = dict()
dss[ECC_KEY_ATTR] = {
curve: {
SHA_HASH_ATTR: SHA2_MODES
}
for curve in ECDSA_NIST_CURVES
}
return {
DSS_SIGN_ATTR: dss
}
@staticmethod
def _import_ecc_key(value: str, key_std: str, **kwargs):
"""
Delegated to import an Elliptic Curve key.
:parameter value: str
:parameter key_std: standard-label of an ECC key
:parameter kwargs: dict
:return: EccKey
"""
# Declares optional parameters
parameters = ['passphrase']
payload = _filter_items(source=kwargs, include_list=parameters)
payload['curve_name'] = key_std
payload['encoded'] = value
return ECC.import_key(**payload)
@staticmethod
def _generate_ecc_key(key_std: str, **kwargs):
"""
Delegated to construct an Elliptic Curve key, based on a
given standard-curve.
:param key_std: str, standard-label, which denotes curve on default
:param kwargs: dict, any additionally allowed data to inject
:return: EccKey
"""
parameters = ['curve', 'rand_func']
payload = _filter_items(source=kwargs, include_list=parameters)
payload['curve'] = key_std or payload.get('curve')
return ECC.generate(**payload)
@staticmethod
def _get_sha(message: bytes, hash_std: str, **data):
"""
Delegated to instantiate a hasher bound to the SHA standard,
deriving a bit-mode-parameter, which by default is set to 256-bits.
:parameter message: bytes
:param hash_std: cryptographic type-wise standard label -
denotes bit-mode
:parameter data: dict, any store-persisted data, related to the hash.
:return: Union[object, Type[None]]
"""
bit_mode = hash_std or data.get(BIT_MODE_ATTR)
module = Hash.__dict__.get(SHA_HASH_ATTR + bit_mode)
if not module:
_LOG.warning(f'SHA does not support {bit_mode} mode.')
return None
return module.new(message)
@classmethod
def _get_dss(cls, key, **data):
"""
Delegated to instantiate a signer bound to the Digital Signature
standard, deriving optional bit-mode-attribute, which by default is
set to deterministic-rfc6979.
:parameter key: Union[DsaKey, EccKey]
:parameter data: dict
:return: Union[object, Type[None]]
"""
parameters = dict(key=key)
default = DSS_RFC6979_MODE
raw_mode = data.get(DSS_MODE_ATTR, default)
try:
parameters['mode'] = str(raw_mode)
except (ValueError, Exception) as e:
_LOG.warning(f'Improper DSS mode value: \'{raw_mode}\'.')
return None
try:
signer = DSS.new(**parameters)
except (TypeError, ValueError, Exception) as e:
_LOG.warning(f'Could not instantiate a DSS signer: {e}')
signer = None
return signer
def _filter_items(source: dict, include_list: list) -> dict:
return {key: value for key, value in source.items() if key in include_list}
def _load_json(data: str):
from json import loads, JSONDecodeError
try:
loaded = loads(data)
except (ValueError, JSONDecodeError):
loaded = data
return loaded