modular_cli_sdk/services/credentials_manager.py (136 lines of code) (raw):

import json import os import shutil from abc import ABC, abstractmethod from functools import cached_property from pathlib import Path from modular_cli_sdk.client.ssm_client import SSMSecretsManager, \ AbstractSecretsManager, VaultSecretsManager from modular_cli_sdk.commons.constants import CONTEXT_MODULAR_ADMIN_USERNAME, \ ENV_VAULT_TOKEN, ENV_VAULT_ADDR from modular_cli_sdk.commons.exception import \ ModularCliSdkConfigurationException from modular_cli_sdk.commons.logger import get_logger _LOG = get_logger(__name__) class AbstractCredentialsManager(ABC): @abstractmethod def store(self, config: dict) -> str: """ Store credentials. Works with file system in standalone installation or with AWS Parameter Store if module is a part of Modular-API """ ... @abstractmethod def extract(self) -> dict: """ Extract credentials. Works with file system in standalone installation or with AWS Parameter Store if module is a part of Modular-API """ ... @abstractmethod def clean_up(self) -> str: """ Delete credentials. Remove records from file system in case of standalone installation or remove parameter from AWS Parameter Store if module is a part of Modular-API """ class CredentialsProvider: def __init__(self, module_name, context): self.module_name = module_name self.context = context def is_modular_mode(self) -> bool: """ Tells whether this instance is in m3-modular-admin mode :return: """ obj = self.context.obj if not isinstance(obj, dict): return False return bool(obj.get(CONTEXT_MODULAR_ADMIN_USERNAME)) @property def credentials_manager(self): if self.is_modular_mode(): instance = SSMCredentialsManager(self.module_name, self.context) else: instance = FileSystemCredentialsManager(self.module_name) return instance class FileSystemCredentialsManager(AbstractCredentialsManager): def __init__(self, module_name: str): home = str(Path.home()) self.module_name = module_name self.creds_folder_path = os.path.join(home, f'.{module_name}') self.config_file_path = os.path.join(self.creds_folder_path, 'credentials') def store(self, config: dict) -> str: try: Path(self.creds_folder_path).mkdir(exist_ok=True, parents=True) except OSError as e: _LOG.error( f'Unable to create configuration folder ' f'{self.creds_folder_path}. Reason: {str(e)}') raise ModularCliSdkConfigurationException( f'Unable to create configuration folder {self.creds_folder_path}' ) with open(self.config_file_path, 'w+') as config_file: json.dump(config, config_file) _LOG.debug( f'Configuration created successfully. Stored by path: ' f'{self.config_file_path}') # todo review:fix # TODO, I think it's bad to return these obviously human strings here. # We should return bool or None or smt, and the user of this class # must decide what string to output based on the result. # But no - we simply imply our string which may or may not be # appropriate. # Also, it's not easily readable for PC: clean_up returns different # strings in case the config was or wasn't cleaned. And how is the # programmers supposed to know, whether the config was cleaned? # By using regex? return f'The configuration for {self.module_name} tool was ' \ f'successfully saved locally' def extract(self) -> dict: if not os.path.exists(self.config_file_path): _LOG.error( f'Can not find configuration file by path: ' f'{self.config_file_path}') raise ModularCliSdkConfigurationException( f'The {self.module_name} tool is not configured. ' f'Please execute the configuration command') with open(self.config_file_path, 'r') as config_file: config_dict = json.load(config_file) _LOG.debug('Configuration successfully loaded') return config_dict def clean_up(self) -> str: try: shutil.rmtree(self.creds_folder_path) except FileNotFoundError: return f'Configuration for {self.module_name} tool not found. ' \ f'Nothing to delete' except OSError: _LOG.error( f'Error occurred while cleaning {self.module_name} ' f'configuration by path: {self.creds_folder_path}.') return f'The {self.module_name} tool configuration has been deleted.' class SSMCredentialsManager(AbstractCredentialsManager): def __init__(self, module_name: str, context): """ :param module_name: str :param context: click.Context """ self.context = context user_name = context.obj[CONTEXT_MODULAR_ADMIN_USERNAME] user_name = AbstractSecretsManager.allowed_name(user_name) self.module_name = module_name self.ssm_secret_name = self.build_ssm_secret_name( module_name=module_name, user_name=user_name ) @staticmethod def build_ssm_secret_name(module_name: str, user_name: str) -> str: return f'modular-api.{module_name}.{user_name}.configuration' @cached_property def ssm_client(self) -> AbstractSecretsManager: """ Can possibly return any implemented client :return: """ if os.environ.get(ENV_VAULT_TOKEN) and os.environ.get(ENV_VAULT_ADDR): _LOG.debug('Returning vault secrets manager') return VaultSecretsManager() _LOG.debug('Returning SSM secrets manager') return SSMSecretsManager() def store(self, config: dict) -> str: saved = self.ssm_client.put_parameter( name=self.ssm_secret_name, value=config ) if saved: return f'The configuration for {self.module_name} tool was ' \ f'successfully saved remotely. Parameter name: ' \ f'{self.ssm_secret_name}' raise ModularCliSdkConfigurationException( f'Unable to save configuration for {self.module_name} to SSM' ) def extract(self) -> dict: result = self.ssm_client.get_parameter(name=self.ssm_secret_name) if not result: raise ModularCliSdkConfigurationException( f'The {self.module_name} tool is not configured. ' f'Please execute the configuration command') if isinstance(result, str): raise ModularCliSdkConfigurationException( 'Can not load configuration. For more information ' 'please check logs') # isinstance(result, (dict, list)) return result def clean_up(self) -> str: removed = self.ssm_client.delete_parameter(name=self.ssm_secret_name) if not removed: return f'Configuration for {self.module_name} tool not found. ' \ f'Nothing to delete' return f'Configuration for {self.module_name} tool was successfully ' \ f'deleted'