docker/services/clients/abstract_key_management.py (57 lines of code) (raw):

from abc import ABC, abstractmethod from typing import Union, Optional, TypeVar, Generic, Dict from re import match from commons.log_helper import get_logger K = TypeVar('K') KEY_TYPE_ATTR = 'key_typ' KEY_STD_ATTR = 'key_std' SIG_SCHEME_ATTR = 'sig_scheme' HASH_TYPE_ATTR = 'hash_type' HASH_STD_ATTR = 'hash_std' ALG_PATTERN = f'(?P<{KEY_TYPE_ATTR}>.+):(?P<{KEY_STD_ATTR}>.+)_' \ f'(?P<{SIG_SCHEME_ATTR}>.+)_' \ f'(?P<{HASH_TYPE_ATTR}>.+):(?P<{HASH_STD_ATTR}>.+)' _LOG = get_logger(__name__) class IKey(Generic[K]): @classmethod def import_key(cls, encoded: Union[str, bytes], **kwargs) -> K: ... def export_key(self, format: str, **kwargs) -> Union[str, bytes]: ... def public_key(self) -> K: ... class AbstractKeyManagementClient(ABC): @abstractmethod def sign(self, key_id, message: Union[str, bytes], algorithm: str, encoding='utf-8') -> Optional[bytes]: raise NotImplemented @abstractmethod def generate(self, key_type: str, key_std: str, **data) -> IKey: raise NotImplemented @abstractmethod def get_key(self, key_type: str, key_std: str, key_data: dict) -> \ Optional[IKey]: raise NotImplemented @abstractmethod def get_key_data(self, key_id: str) -> Optional[dict]: raise NotImplemented @classmethod @abstractmethod def construct(cls, key_type: str, key_std: str, key_value: str, **data): raise NotImplemented @classmethod @abstractmethod def is_signature_scheme_accessible( cls, sig_scheme: str, key_type: str, key_std: str, hash_type: str, hash_std: str ): raise NotImplemented @staticmethod def dissect_alg(alg: str) -> Optional[Dict[str, str]]: """ Returns `alg` value dissected dict-object, with the following keys: - key_type: str - key_std: str - sig_scheme: str - hash_type: str - hash_std: str :return: Optional[Dict[str, str]] """ mapped = None try: m = match(pattern=ALG_PATTERN, string=alg) mapped = m.groupdict() except (ValueError, Exception) as e: _LOG.warning(f'Alg:\'{alg}\' could not be dissected due to "{e}".') return mapped