docker/services/clients/ssm.py (139 lines of code) (raw):

import json import os from abc import ABC, abstractmethod from typing import Optional, List, Dict import boto3 from botocore.client import ClientError from modular_sdk.services.ssm_service import SecretValue from commons.constants import ENV_SERVICE_MODE, DOCKER_SERVICE_MODE, \ ENV_VAULT_TOKEN, ENV_VAULT_HOST, ENV_VAULT_PORT from commons.log_helper import get_logger from services.environment_service import EnvironmentService _LOG = get_logger('ssmclient') class AbstractSSMClient(ABC): def __init__(self, environment_service: EnvironmentService): self._environment_service = environment_service @abstractmethod def get_secret_value(self, secret_name: str): pass @abstractmethod def create_secret(self, secret_name: str, secret_value: SecretValue, secret_type='SecureString') -> bool: pass @abstractmethod def delete_parameter(self, secret_name: str) -> bool: pass @abstractmethod def enable_secrets_engine(self, mount_point=None): pass @abstractmethod def is_secrets_engine_enabled(self, mount_point=None) -> bool: pass class SSMClient(AbstractSSMClient): def __init__(self, environment_service: EnvironmentService): super().__init__(environment_service) self._client = None @property def client(self): if not self._client: self._client = boto3.client( 'ssm', self._environment_service.aws_region()) return self._client def get_secret_value(self, secret_name): try: response = self.client.get_parameter( Name=secret_name, WithDecryption=True ) value_str = response['Parameter']['Value'] try: return json.loads(value_str) except json.decoder.JSONDecodeError: return value_str except ClientError as e: error_code = e.response['Error']['Code'] _LOG.error(f'Can\'t get secret for name \'{secret_name}\', ' f'error code: \'{error_code}\'') def create_secret(self, secret_name: str, secret_value: str, secret_type='SecureString'): try: if isinstance(secret_value, (list, dict)): secret_value = json.dumps(secret_value) self.client.put_parameter( Name=secret_name, Value=secret_value, Overwrite=True, Type=secret_type) except ClientError as e: error_code = e.response['Error']['Code'] _LOG.error(f'Can\'t get secret for name \'{secret_name}\', ' f'error code: \'{error_code}\'') def delete_parameter(self, secret_name: str): try: self.client.delete_parameter(Name=secret_name) except ClientError as e: error_code = e.response['Error']['Code'] _LOG.error(f'Can\'t delete secret name \'{secret_name}\', ' f'error code: \'{error_code}\'') def enable_secrets_engine(self, mount_point=None): pass def is_secrets_engine_enabled(self, mount_point=None) -> bool: pass class VaultSSMClient(AbstractSSMClient): mount_point = 'kv' key = 'data' def __init__(self, environment_service: EnvironmentService): super().__init__(environment_service) self._client = None # hvac.Client def _init_client(self): assert os.getenv(ENV_SERVICE_MODE) == DOCKER_SERVICE_MODE, \ "You can init vault handler only if SERVICE_MODE=docker" import hvac vault_token = os.getenv(ENV_VAULT_TOKEN) vault_host = os.getenv(ENV_VAULT_HOST) vault_port = os.getenv(ENV_VAULT_PORT) _LOG.info('Initializing hvac client') self._client = hvac.Client( url=f'http://{vault_host}:{vault_port}', token=vault_token ) _LOG.info('Hvac client was initialized') @property def client(self): if not self._client: self._init_client() return self._client def get_secret_value(self, secret_name: str) -> Optional[SecretValue]: try: response = self.client.secrets.kv.v2.read_secret_version( path=secret_name, mount_point=self.mount_point) or {} except Exception: # hvac.InvalidPath return return response.get('data', {}).get('data', {}).get(self.key) def create_secret(self, secret_name: str, secret_value: SecretValue, secret_type='SecureString') -> bool: return self.client.secrets.kv.v2.create_or_update_secret( path=secret_name, secret={self.key: secret_value}, mount_point=self.mount_point ) def delete_parameter(self, secret_name: str) -> bool: return bool(self.client.secrets.kv.v2.delete_metadata_and_all_versions( path=secret_name, mount_point=self.mount_point)) def get_secret_values(self, secret_names: List[str] ) -> Optional[Dict[str, SecretValue]]: return {name: self.get_secret_value(name) for name in secret_names} def enable_secrets_engine(self, mount_point=None): try: self.client.sys.enable_secrets_engine( backend_type='kv', path=(mount_point or self.mount_point), options={'version': 2} ) return True except Exception: # hvac.exceptions.InvalidRequest return False # already exists def is_secrets_engine_enabled(self, mount_point=None) -> bool: mount_points = self.client.sys.list_mounted_secrets_engines() target_point = mount_point or self.mount_point return f'{target_point}/' in mount_points