modular_cli_sdk/client/ssm_client.py (156 lines of code) (raw):
import json
import os
import re
from abc import abstractmethod, ABC
from functools import cached_property
from typing import Optional, Union, List, Dict
import boto3
from botocore.client import ClientError
from botocore.credentials import JSONFileCache
from botocore.exceptions import NoCredentialsError, NoRegionError
from modular_cli_sdk.commons.constants import ENV_VAULT_ADDR, ENV_VAULT_TOKEN
from modular_cli_sdk.commons.logger import get_logger
_LOG = get_logger(__name__)
SSM_NOT_AVAILABLE = re.compile(r'[^a-zA-Z0-9\/_.-]')
SecretValue = Union[Dict, List, str]
class AbstractSecretsManager(ABC):
@staticmethod
def allowed_name(name: str) -> str:
"""
Keeps only allowed symbols
"""
return str(re.sub(SSM_NOT_AVAILABLE, '-', name))
@abstractmethod
def get_parameter(self, name: str) -> Optional[SecretValue]:
...
@abstractmethod
def put_parameter(self, name: str, value: SecretValue,
_type='SecureString') -> bool:
...
@abstractmethod
def delete_parameter(self, name: str) -> bool:
"""
Returns True in case the parameter was saved successfully
:param name:
:return:
"""
class OnPremSecretsManager(AbstractSecretsManager):
"""
The purpose is only debug and local testing. It must not be used as
prod environment because it's not secure at all. Here I just
emulate some parameter store. In case we really need on-prem,
we must use Vault
"""
path = os.path.expanduser(os.path.join('~', '.modular_cli', 'on-prem', 'ssm'))
def __init__(self):
self._store = JSONFileCache(self.path)
def put_parameter(self, name: str, value: SecretValue,
_type='SecureString') -> bool:
self._store[name] = value
return True
def get_parameter(self, name: str) -> Optional[SecretValue]:
if name in self._store:
return self._store[name]
def delete_parameter(self, name: str) -> bool:
if name in self._store:
del self._store[name]
return True
return False
class VaultSecretsManager(AbstractSecretsManager):
mount_point = 'kv'
key = 'data'
def __init__(self):
self._client = None # hvac.Client
def _init_client(self):
try:
import hvac
except ImportError:
raise RuntimeError('Install hvac to use Vault client. '
'"pip install hvac==0.11.2"')
_LOG.debug('Initializing hvac client')
self._client = hvac.Client(
url=os.getenv(ENV_VAULT_ADDR),
token=os.getenv(ENV_VAULT_TOKEN)
)
_LOG.debug('Hvac client was initialized')
@property
def client(self):
if not self._client:
self._init_client()
return self._client
def get_parameter(self, name: str) -> Optional[SecretValue]:
try:
response = self.client.secrets.kv.v2.read_secret_version(
path=name, mount_point=self.mount_point) or {}
except Exception: # hvac.InvalidPath
return
return response.get('data', {}).get('data', {}).get(self.key)
def put_parameter(self, name: str, value: SecretValue,
_type='SecureString') -> bool:
self.client.secrets.kv.v2.create_or_update_secret(
path=name,
secret={self.key: value},
mount_point=self.mount_point
)
return True
def delete_parameter(self, name: str) -> bool:
return bool(self.client.secrets.kv.v2.delete_metadata_and_all_versions(
path=name, mount_point=self.mount_point))
def enable_secrets_engine(self, mount_point=None) -> bool:
try:
self.client.sys.enable_secrets_engine(
backend_type='kv',
path=(mount_point or self.mount_point),
options={'version': 2}
)
return True
except Exception: # hvac.exceptions.InvalidRequest
return False # already exists
def is_secrets_engine_enabled(self, mount_point=None) -> bool:
mount_points = self.client.sys.list_mounted_secrets_engines()
target_point = mount_point or self.mount_point
return f'{target_point}/' in mount_points
class SSMSecretsManager(AbstractSecretsManager):
def __init__(self, region: Optional[str] = None):
self._region = region
@cached_property
def client(self):
_LOG.info('Initializing ssm boto3 client')
try:
return boto3.client('ssm', region_name=self._region)
except NoCredentialsError:
raise ValueError('No aws credentials could be found')
except NoRegionError:
raise ValueError('No aws region could be found. Set AWS_DEFAULT_REGION environment')
def get_parameter(self, name: str) -> Optional[SecretValue]:
try:
response = self.client.get_parameter(
Name=name,
WithDecryption=True
)
value_str = response['Parameter']['Value']
_LOG.debug(f'Configuration \'{name}\' from SSM received')
try:
return json.loads(value_str)
except json.JSONDecodeError:
_LOG.warning('Could not load json from SSM value. '
'Returning raw string')
return value_str
except ClientError as e:
error_code = e.response['Error']['Code']
_LOG.error(f'Can\'t get secret for name \'{name}\', '
f'error code: \'{error_code}\'')
return
def put_parameter(self, name: str, value: SecretValue,
_type='SecureString') -> bool:
try:
if isinstance(value, (list, dict)):
value = json.dumps(value)
_LOG.debug(f'Saving \'{name}\' to SSM')
self.client.put_parameter(
Name=name,
Value=value,
Overwrite=True,
Type=_type)
return True
except ClientError as e:
error_code = e.response['Error']['Code']
_LOG.error(f'Can\'t put secret for name \'{name}\', '
f'error code: \'{error_code}\'')
return False
def delete_parameter(self, name: str) -> bool:
try:
_LOG.info(f'Removing {name} from SSM')
self.client.delete_parameter(Name=name)
except ClientError as e:
error_code = e.response['Error']['Code']
_LOG.error(f'Can\'t delete secret name \'{name}\', '
f'error code: \'{error_code}\'')
return False
return True