modular_sdk/services/ssm_service.py (178 lines of code) (raw):

import json import os import re from abc import ABC, abstractmethod from typing import Union, Dict, Optional, List from botocore.credentials import JSONFileCache from botocore.exceptions import ClientError from cachetools import TTLCache from modular_sdk.commons.log_helper import get_logger from modular_sdk.commons.time_helper import utc_datetime from modular_sdk.services.aws_creds_provider import AWSCredentialsProvider, \ ModularAssumeRoleClient from modular_sdk.services.environment_service import EnvironmentService _LOG = get_logger(__name__) SSM_NOT_AVAILABLE = re.compile(r'[^a-zA-Z0-9\/_.-]') SecretValue = Union[Dict, List, str] class AbstractSSMClient(ABC): @staticmethod def allowed_name(name: str) -> str: """ Keeps only allowed symbols """ return str(re.sub(SSM_NOT_AVAILABLE, '-', name)) def safe_name(self, name: str, prefix: Optional[str] = None, date: Optional[bool] = True) -> str: if prefix: name = f'{prefix}.{name}' if date: name = f'{name}.{utc_datetime().strftime("%m.%d.%Y.%H.%M.%S")}' return self.allowed_name(name) @abstractmethod def get_parameter(self, name: str) -> Optional[SecretValue]: ... @abstractmethod def put_parameter(self, name: str, value: SecretValue, _type='SecureString') -> Optional[str]: ... @abstractmethod def delete_parameter(self, name: str) -> bool: ... class OnPremSSMClient(AbstractSSMClient): """ The purpose is only debug and local testing. It must not be used as prod environment because it's not secure at all. Here I just emulate some parameter store. In case we really need on-prem, we must use Vault """ path = os.path.expanduser(os.path.join('~', '.modular_sdk', 'on-prem', 'ssm')) def __init__(self): self._store = JSONFileCache(self.path) def put_parameter(self, name: str, value: SecretValue, _type='SecureString') -> Optional[str]: self._store[name] = value return name def get_parameter(self, name: str) -> Optional[SecretValue]: if name in self._store: return self._store[name] def delete_parameter(self, name: str) -> bool: if name in self._store: del self._store[name] return True return False class VaultSSMClient(AbstractSSMClient): mount_point = 'kv' key = 'data' def __init__(self): self._client = None # hvac.Client def _init_client(self): import hvac # TODO use some discussed constants. These I get from Custodian vault_token = os.getenv('VAULT_TOKEN') vault_host = os.getenv('VAULT_URL') vault_port = os.getenv('VAULT_SERVICE_SERVICE_PORT') _LOG.info('Initializing hvac client') self._client = hvac.Client( url=f'http://{vault_host}:{vault_port}', token=vault_token ) _LOG.info('Hvac client was initialized') @property def client(self): if not self._client: self._init_client() return self._client def get_parameter(self, name: str) -> Optional[SecretValue]: try: response = self.client.secrets.kv.v2.read_secret_version( path=name, mount_point=self.mount_point) or {} except Exception: # hvac.InvalidPath return val = response.get('data', {}).get('data', {}).get(self.key) if isinstance(val, str): try: val = json.loads(val) except json.JSONDecodeError: pass return val def put_parameter(self, name: str, value: SecretValue, _type='SecureString') -> Optional[str]: if isinstance(value, str): # probably Maestro does not dump string to json. to_save = value else: to_save = json.dumps(value, separators=(',', ':'), sort_keys=True) self.client.secrets.kv.v2.create_or_update_secret( path=name, secret={self.key: to_save}, mount_point=self.mount_point ) return name def delete_parameter(self, name: str) -> bool: return bool(self.client.secrets.kv.v2.delete_metadata_and_all_versions( path=name, mount_point=self.mount_point)) class SSMService(AWSCredentialsProvider, # actually it's a client AbstractSSMClient): def __init__(self, **kwargs): kwargs['service_name'] = 'ssm' super().__init__(**kwargs) def get_parameter(self, name: str) -> Optional[SecretValue]: try: response = self.client.get_parameter( Name=name, WithDecryption=True ) value_str = response['Parameter']['Value'] try: return json.loads(value_str) except json.JSONDecodeError: return value_str except ClientError as e: error_code = e.response['Error']['Code'] _LOG.error(f'Can\'t get secret for name \'{name}\', ' f'error code: \'{error_code}\'') return def put_parameter(self, name: str, value: SecretValue, _type='SecureString') -> Optional[str]: """ In case the secret was saved successfully, its real name is returned. (the name can differ from the given one). In case something went wrong, None is returned """ try: if isinstance(value, (list, dict)): value = json.dumps(value, separators=(",", ":"), sort_keys=True) self.client.put_parameter( Name=name, Value=value, Overwrite=True, Type=_type) return name except ClientError as e: error_code = e.response['Error']['Code'] _LOG.error(f'Can\'t put secret for name \'{name}\', ' f'error code: \'{error_code}\'') return def delete_parameter(self, name: str) -> bool: try: self.client.delete_parameter(Name=name) except ClientError as e: error_code = e.response['Error']['Code'] _LOG.error(f'Can\'t delete secret name \'{name}\', ' f'error code: \'{error_code}\'') return False return True class ModularAssumeRoleSSMService(SSMService): client = ModularAssumeRoleClient('ssm') def __init__(self, *args, **kwargs): """ No need """ class SSMClientCachingWrapper(AbstractSSMClient): def __init__(self, client: AbstractSSMClient, environment_service: EnvironmentService): self._client = client self._cache = TTLCache( maxsize=50, ttl=environment_service.inner_cache_ttl_seconds() ) @property def client(self) -> AbstractSSMClient: return self._client def get_parameter(self, name: str) -> Optional[SecretValue]: if name in self._cache: return self._cache[name] value = self.client.get_parameter(name) if value: self._cache[name] = value return value def put_parameter(self, name: str, value: SecretValue, _type='SecureString') -> Optional[str]: name = self.client.put_parameter(name, value, _type) if name: self._cache[name] = value return name def delete_parameter(self, name: str) -> bool: self._cache.pop(name, None) return self.client.delete_parameter(name)