docker/services/clients/license_manager.py (127 lines of code) (raw):
from functools import cached_property
from json import JSONDecodeError
from typing import Optional
from typing import Union, List, Type, Dict
from modular_sdk.services.impl.maestro_credentials_service import AccessMeta
from requests import request, Response
from requests.exceptions import RequestException
from commons.constants import POST_METHOD, PATCH_METHOD, \
STATUS_ATTR, TENANT_ATTR, AUTHORIZATION_PARAM, CUSTOMER_ATTR, \
SERVICE_TYPE_ATTR, SERVICE_TYPE_RIGHTSIZER, ALGORITHMS_ATTR
from commons.log_helper import get_logger
from services.setting_service import SettingsService
SET_TENANT_ACTIVATION_DATE_PATH = '/tenants/set-activation-date'
SET_CUSTOMER_ACTIVATION_DATE_PATH = '/customers/set-activation-date'
JOB_CHECK_PERMISSION_PATH = '/jobs/check-permission'
SYNC_LICENSE_PATH = '/license/sync'
JOBS_PATH = '/jobs'
HOST_KEY = 'host'
JOB_ID = 'job_id'
CREATED_AT_ATTR = 'created_at'
STARTED_AT_ATTR = 'started_at'
STOPPED_AT_ATTR = 'stopped_at'
_LOG = get_logger(__name__)
class LicenseManagerClient:
def __init__(self, setting_service: SettingsService):
self.setting_service = setting_service
self._access_data = None
self._client_key_data = None
@cached_property
def access_data(self) -> dict:
return self.setting_service.get_license_manager_access_data() or {}
@property
def host(self) -> Optional[str]:
return AccessMeta.from_dict(self.access_data).url
@property
def client_key_data(self):
if not self._client_key_data:
self._client_key_data = \
self.setting_service.get_license_manager_client_key_data()
self._client_key_data = self._client_key_data or {}
return self._client_key_data
def post_job(self, job_id: str, customer: str, tenant: str,
algorithm_map: Dict[str, List[str]], auth: str):
"""
Delegated to instantiate a licensed Job, bound to a tenant within a
customer utilizing rulesets which are grouped by tenant-license-keys,
allowing to request for a ruleset-content-source collection.
:parameter job_id: str
:parameter customer: str
:parameter tenant: str
:parameter auth: str, authorization token
:parameter algorithm_map: Dict[str, List[str]]
:return: Union[Response, Type[None]]
"""
host, method = self.host, POST_METHOD
if not host:
_LOG.error('CustodianLicenceManager access data has not been'
' provided.')
return None
host = host.strip('/')
url = host + JOBS_PATH
payload = {
SERVICE_TYPE_ATTR: SERVICE_TYPE_RIGHTSIZER,
JOB_ID: job_id,
CUSTOMER_ATTR: customer,
TENANT_ATTR: tenant,
ALGORITHMS_ATTR: algorithm_map
}
headers = {
AUTHORIZATION_PARAM: auth
}
return self._send_request(
url=url, method=method, payload=payload, headers=headers
)
def patch_job(self, job_id: str, auth: str, created_at: str = None,
started_at: str = None, stopped_at: str = None,
status: str = None):
host = self.host
if not any([created_at, started_at, stopped_at, status]):
_LOG.warning('No attributes to update provided. Skipping')
return
if not host:
_LOG.error('CustodianLicenceManager access data has not been'
' provided.')
return
url = host.strip('/') + JOBS_PATH
payload = {
JOB_ID: job_id,
CREATED_AT_ATTR: created_at,
STARTED_AT_ATTR: started_at,
STOPPED_AT_ATTR: stopped_at,
STATUS_ATTR: status
}
headers = {
AUTHORIZATION_PARAM: auth
}
payload = {k: v for k, v in payload.items()
if isinstance(v, (bool, int)) or v}
return self._send_request(
url=url, method=PATCH_METHOD, payload=payload, headers=headers
)
@classmethod
def _send_request(
cls, url: str, method: str, payload: dict,
headers: Optional[dict] = None
) -> Optional[Response]:
"""
Meant to commence a request to a given url, by deriving a
proper delegated handler. Apart from that, catches any risen
request related exception.
:parameter url: str
:parameter method:str
:parameter payload: dict
:return: Union[Response, Type[None]]
"""
_injectable_payload = cls._request_payload_injector(method, payload)
try:
_input = f'data - {_injectable_payload}'
if headers:
_input += f', headers: {headers}'
_LOG.debug(f'Going to send \'{method}\' request to \'{url}\''
f' with the following {_input}.')
response = request(
url=url, method=method, headers=headers, **_injectable_payload
)
_LOG.debug(f'Response from {url}: {response}')
return response
except (RequestException, Exception) as e:
_LOG.error(f'Error occurred while executing request. Error: {e}')
return
@classmethod
def _request_payload_injector(cls, method: str, payload: dict):
_map = cls._define_method_injection_map(payload)
return _map.get(method, payload) if method in _map else None
@staticmethod
def retrieve_json(response: Response) -> Union[Dict, Type[None]]:
_json = None
try:
_json = response.json()
except JSONDecodeError as je:
_LOG.warning(f'JSON response from \'{response.url}\' not be '
f'decoded. An exception has occurred: {je}')
return _json
@staticmethod
def _define_method_injection_map(payload):
return {POST_METHOD: dict(json=payload),
PATCH_METHOD: dict(json=payload)}