docker/services/license_manager_service.py (152 lines of code) (raw):

from typing import List, Dict, Optional from commons.constants import RESPONSE_OK_CODE, \ ITEMS_PARAM, MESSAGE_PARAM, CLIENT_TOKEN_ATTR, \ KID_ATTR, ALG_ATTR, RESPONSE_FORBIDDEN_CODE,\ RESPONSE_RESOURCE_NOT_FOUND_CODE from commons.log_helper import get_logger from services.clients.license_manager import LicenseManagerClient from services.token_service import TokenService from commons.time_helper import utc_datetime from datetime import timedelta _LOG = get_logger(__name__) GENERIC_JOB_LICENSING_ISSUE = 'Job:\'{id}\' could not be granted by the ' \ 'License Manager Service.' class BalanceExhaustion(Exception): def __init__(self, message: str): self.message = message def __str__(self): return self.message class InaccessibleAssets(Exception): def __init__( self, message: str, assets: Dict[str, List[str]], hr_sep: str, ei_sep: str, i_sep: str, i_wrap: Optional[str] = None ): self._assets = self._dissect( message=message, assets=assets, hr_sep=hr_sep, ei_sep=ei_sep, i_sep=i_sep, i_wrap=i_wrap ) @staticmethod def _dissect( message: str, assets: Dict[str, List[str]], hr_sep: str, ei_sep: str, i_sep: str, i_wrap: Optional[str] = None ): """ Dissects License Manager response of entity(ies)-not-found message. Such as: TenantLicense or Ruleset(s):$id(s) - $reason. param message: str - maintains the raw response message param assets: Dict[str, List[str]] - source of assets to param hr_sep: str - head-reason separator, within the response message param ei_sep: str - entity type - id(s) separator, within the head of the message param i_sep: str - separator of entity-identifier(s), within the raw id(s). param i_wrap: Optional[str] - quote-type wrapper of each identifier. """ each_template = 'Each of {} license-subscription' head, *_ = message.rsplit(hr_sep, maxsplit=1) head = head.strip(' ') if not head: _LOG.error(f'Response message is not separated by a \'{hr_sep}\'.') return entity, *ids = head.split(ei_sep, maxsplit=1) ids = ids[0] if len(ids) == 1 else '' if 's' in entity and entity.index('s') == len(entity)-1: ids = ids.split(i_sep) ids = [each.strip(i_wrap or '') for each in ids.split(i_sep)] if 'TenantLicense' in entity: ids = [ asset for tlk in ids if tlk in assets for asset in assets[tlk] or [each_template.format(tlk)] ] return ids def __str__(self): head = 'Ruleset' if len(self._assets) > 1: head += 's' scope = ', ' .join(f'"{each}"' for each in self._assets) reason = 'are' if len(self._assets) > 1 else 'is' reason += ' no longer accessible' return f'{head}:{scope} - {reason}.' def __iter__(self): return iter(self._assets) class LicenseManagerService: def __init__( self, license_manager_client: LicenseManagerClient, token_service: TokenService ): self.license_manager_client = license_manager_client self.token_service = token_service def update_job_in_license_manager( self, job_id: str, created_at: str = None, started_at: str = None, stopped_at: str = None, status: str = None, expires: dict = None ): auth = self._get_client_token(expires or dict(hours=1)) if not auth: _LOG.warning('Client authorization token could be established.') return None response = self.license_manager_client.patch_job( job_id=job_id, created_at=created_at, started_at=started_at, stopped_at=stopped_at, status=status, auth=auth ) if response and response.status_code == RESPONSE_OK_CODE: return self.license_manager_client.retrieve_json(response) return def instantiate_licensed_job_dto( self, job_id: str, customer: str, tenant: str, algorithm_map: Dict[str, List[str]], expires: dict = None ): """ Mandates licensed Job data transfer object retrieval, by successfully interacting with LicenseManager providing the following parameters. :parameter job_id: str :parameter customer: str :parameter tenant: str :parameter algorithm_map: Union[Type[None], List[str]] :parameter expires: dict, denotes auth-token expiration :raises: InaccessibleAssets, given the requested content is not accessible :raises: BalanceExhaustion, given the job-balance has been exhausted :return: Optional[Dict] """ auth = self._get_client_token(expires or dict(hours=1)) if not auth: _LOG.warning('Client authorization token could be established.') return None response = self.license_manager_client.post_job( job_id=job_id, customer=customer, tenant=tenant, algorithm_map=algorithm_map, auth=auth ) if response is None: return decoded = self.license_manager_client.retrieve_json(response) or {} if response.status_code == RESPONSE_OK_CODE: items = decoded.get(ITEMS_PARAM, []) if len(items) != 1: _LOG.warning(f'Unexpected License Manager response: {items}.') item = None else: item = items.pop() return item else: message = decoded.get(MESSAGE_PARAM) if response.status_code == RESPONSE_RESOURCE_NOT_FOUND_CODE: raise InaccessibleAssets( message=message, assets=algorithm_map, hr_sep='-', ei_sep=':', i_sep=', ', i_wrap='\'' ) elif response.status_code == RESPONSE_FORBIDDEN_CODE: raise BalanceExhaustion(message) def _get_client_token(self, expires: dict, **payload): """ Delegated to derive a custodian-service-token, encoding any given payload key-value pairs into the claims. :parameter expires: dict, meant to store timedelta kwargs :parameter payload: dict :return: Union[str, Type[None]] """ token_type = CLIENT_TOKEN_ATTR key_data = self.license_manager_client.client_key_data kid, alg = key_data.get(KID_ATTR), key_data.get(ALG_ATTR) if not (kid and alg): _LOG.warning('LicenseManager Client-Key data is missing.') return t_head = f'\'{token_type}\'' encoder = self.token_service.derive_encoder( token_type=CLIENT_TOKEN_ATTR, **payload ) if not encoder: return None # Establish a kid reference to a key. encoder.prk_id = self.derive_client_private_key_id( kid=kid ) _LOG.info(f'{t_head} - {encoder.prk_id} private-key id has been ' f'assigned.') encoder.kid = kid _LOG.info(f'{t_head} - {encoder.kid} token \'kid\' has been assigned.') encoder.alg = alg _LOG.info(f'{t_head} - {encoder.alg} token \'alg\' has been assigned.') encoder.expire(utc_datetime() + timedelta(**expires)) try: token = encoder.product except (Exception, BaseException) as e: _LOG.error(f'{t_head} could not be encoded, due to: {e}.') token = None if not token: _LOG.warning(f'{t_head} token could not be encoded.') return token @staticmethod def derive_client_private_key_id(kid: str): return f'cs_lm_client_{kid}_prk' @staticmethod def _default_instance(value, _type: type, *args, **kwargs): return value if isinstance(value, _type) else _type(*args, **kwargs)