osci/config/reader.py (103 lines of code) (raw):

"""Copyright since 2019, EPAM Systems This file is part of OSCI. OSCI is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. OSCI is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with OSCI. If not, see <http://www.gnu.org/licenses/>.""" from typing import Dict, Any, Union, Callable, Optional from pathlib import Path from deepmerge import always_merger import logging import yaml import os log = logging.getLogger(__name__) META_CONFIG_FIELD = 'meta' CONFIG_SOURCE_TYPE_FIELD = 'config_source' CONFIG_SOURCE_TYPE_ENV = 'env' CONFIG_SOURCE_TYPE_DATABRICKS = 'dbutils' DATABRICKS_SECRETS_SCOPE_FIELD = 'databricks_scope' PARENT_CONFIG_FIELD = 'extends' class BaseConfigReader: """Abstract class for all configurations reader""" def __init__(self, env: str, *args, **kwargs): self.env = env self.__cfg: Optional[Dict[str, Any]] = None def read(self) -> Dict[str, Any]: """Read config by environment key""" raise NotImplementedError() @property def config(self): if self.__cfg is None: return self.read() return self.__cfg @config.setter def config(self, cfg): self.__cfg = cfg def exists(self) -> bool: """Check configuration exists""" raise NotImplementedError() def read_config_from_databricks_secrets(config: dict, dbutils=None) -> dict: log.debug('Check read config from databricks secrets variables') out_config = dict() if dbutils is None: log.error('`dbutils` is not defined') return out_config try: scope = config[META_CONFIG_FIELD][DATABRICKS_SECRETS_SCOPE_FIELD] def _load_variables_from_secrets(variable): if isinstance(variable, dict): return { k: _load_variables_from_secrets(v) for k, v in variable.items() } if isinstance(variable, list): return [_load_variables_from_secrets(v) for v in variable] return dbutils.secrets.get(scope=scope, key=str(variable)) out_config = {k: _load_variables_from_secrets(v) for k, v in config.items() if k != META_CONFIG_FIELD} except KeyError as ex: log.error(f'Databricks scope field (`{DATABRICKS_SECRETS_SCOPE_FIELD}`) not found ' f'in {config[META_CONFIG_FIELD]}: {ex}') return out_config def read_config_from_environ(config: dict, *args, **kwargs) -> dict: log.debug('Check read config from environ variables') def _load_variables_from_env(variable): if isinstance(variable, dict): return { k: _load_variables_from_env(v) for k, v in variable.items() } if isinstance(variable, list): return [_load_variables_from_env(v) for v in variable] print('key', variable, 'value', os.environ.get(str(variable))) return os.environ.get(str(variable)) return {k: _load_variables_from_env(v) for k, v in config.items() if k != META_CONFIG_FIELD} readers_types_map: Dict[str, Callable[[dict, Optional[Any]], dict]] = { CONFIG_SOURCE_TYPE_ENV: read_config_from_environ, CONFIG_SOURCE_TYPE_DATABRICKS: read_config_from_databricks_secrets } class BaseYmlConfigReader(BaseConfigReader): """YAML config file reader""" DEFAULT_DIRECTORY_NAME = 'files' DEFAULT_DIRECTORY_PATH = Path(__file__).parent.resolve() / DEFAULT_DIRECTORY_NAME DEFAULT_FILE_FORMAT = 'yml' def __init__(self, env: str, directory_path: Union[str, Path] = DEFAULT_DIRECTORY_PATH, file_format: str = DEFAULT_FILE_FORMAT, dbutils=None): """ :param env: environment key (ex. `local`, `dev`, `stage`, `prod`, etc) :param directory_path: path to config files dictionary :param file_format: config file format (default: `yml`) """ super().__init__(env) self.directory_path = directory_path self.file_format = file_format self.dbutils = dbutils @property def file_path(self): return Path(self.directory_path) / f"{self.env}.{self.file_format}" def read(self) -> Dict[str, Any]: """Read configuration from file Read and parse yaml configuration file by rule: `<directory_path>/<env>.<file_format>` :return: configuration dictionary """ try: log.debug(f'Read config from {self.file_path}') with open(self.file_path) as config_file: self.__cfg = yaml.load(config_file, Loader=yaml.FullLoader) log.debug(f"Prod yml load: {self.__cfg}") meta = self.__cfg[META_CONFIG_FIELD].copy() if meta[CONFIG_SOURCE_TYPE_FIELD] in readers_types_map: self.__cfg = readers_types_map[meta[CONFIG_SOURCE_TYPE_FIELD]](self.__cfg, self.dbutils) if meta.get(PARENT_CONFIG_FIELD) is not None: self.__cfg = always_merger.merge( BaseYmlConfigReader(env=meta.get(PARENT_CONFIG_FIELD), directory_path=self.directory_path, file_format=self.file_format).config, self.__cfg ) log.debug(f"Prod yml res: {self.__cfg}") return self.__cfg except FileNotFoundError as ex: log.error(ex) raise ex def exists(self) -> bool: """Check configuration exists in `directory_path` :return: is exist file """ log.debug(f'Check config file for env {self.env} exists') return self.file_path.is_file()