osci/config/base.py (132 lines of code) (raw):

"""Copyright since 2019, EPAM Systems This file is part of OSCI. OSCI is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. OSCI is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with OSCI. If not, see <http://www.gnu.org/licenses/>.""" import logging import os import json from typing import Mapping, Any, Dict, NamedTuple from .reader import BaseYmlConfigReader from osci.utils import MetaSingleton log = logging.getLogger(__name__) class BaseConfig(metaclass=MetaSingleton): """Abstract class for all configurations""" @classmethod def tear_down(cls): """Delete all instances of the class""" cls._instances = {} log.debug(f'Delete all instances of {cls}') class FileSystemConfig(BaseConfig): def __init__(self, cfg): self.file_system_cfg = cfg.get('file_system', dict()) self.areas_cfg = cfg.get('areas', dict()) @property def landing_container(self) -> str: return self.areas_cfg.get('landing', dict()).get('container') @property def staging_container(self) -> str: return self.areas_cfg.get('staging', dict()).get('container') @property def public_container(self) -> str: return self.areas_cfg.get('public', dict()).get('container') @property def landing_props(self) -> Dict[str, Any]: raise NotImplementedError() @property def staging_props(self) -> Dict[str, Any]: raise NotImplementedError() @property def public_props(self) -> Dict[str, Any]: raise NotImplementedError() class LocalFileSystemConfig(FileSystemConfig): @property def base_path(self) -> str: return self.file_system_cfg.get('base_path') @property def landing_props(self) -> Dict[str, Any]: return dict(base_path=self.base_path, base_area_dir=self.landing_container) @property def staging_props(self) -> Dict[str, Any]: return dict(base_path=self.base_path, base_area_dir=self.staging_container) @property def public_props(self) -> Dict[str, Any]: return dict(base_path=self.base_path, base_area_dir=self.public_container) class AzureBlobFileSystemConfig(FileSystemConfig): @property def account_name(self) -> str: return self.file_system_cfg.get('account_name') @property def account_key(self) -> str: return self.file_system_cfg.get('account_key') @property def landing_props(self) -> Dict[str, Any]: return dict(storage_account_name=self.account_name, storage_account_key=self.account_key, area_container=self.landing_container) @property def staging_props(self) -> Dict[str, Any]: return dict(storage_account_name=self.account_name, storage_account_key=self.account_key, area_container=self.staging_container) @property def public_props(self) -> Dict[str, Any]: return dict(storage_account_name=self.account_name, storage_account_key=self.account_key, area_container=self.public_container) class FileSystemType: blob = 'blob' local = 'local' class WebConfig(NamedTuple): fs: str attrs: dict def parse_web_config(web_cfg) -> WebConfig: log.debug(web_cfg) fs = web_cfg['fs'] attrs_map = { FileSystemType.local: dict(base_path=web_cfg['base_path'], base_area_dir=web_cfg['container']), FileSystemType.blob: dict(storage_account_name=web_cfg['account_name'], storage_account_key=web_cfg['account_key'], area_container=web_cfg['container']) } if fs not in attrs_map: raise ValueError(f'Unsupported filesystem type `{fs}`. ' f'Available options: {",".join(attrs_map.keys())}') return WebConfig(fs=fs, attrs=attrs_map[fs]) class Config(BaseConfig): """Configuration is a Singleton and will be using just one configuration instance Priorities of how to get the configuration: 1. Passed env during initialization of the class 2. Environment variable `ENV` 4. Using `local.yml` config if exists 3. Using `default.yml` config """ def __init__(self, env: str = None, dbutils=None): """Init configuration :param env: environment on which the database configuration should be obtained """ log.info(f'ENV: {os.environ.get("ENV")}') self.env = env or os.environ.get('ENV') or ('local' if BaseYmlConfigReader(env='local').exists() else None) or 'default' self.__cfg = BaseYmlConfigReader(self.env, dbutils=dbutils).config log.info(f"Full config: {self.__cfg}") log.info(f'Configuration loaded for env: {self.env}') file_system_type_map: Mapping[str, type(FileSystemConfig)] = { FileSystemType.blob: AzureBlobFileSystemConfig, FileSystemType.local: LocalFileSystemConfig } if self.file_system_type not in file_system_type_map: raise ValueError(f'Unsupported file system type {self.file_system_type}') self.file_system: FileSystemConfig = file_system_type_map[self.file_system_type](self.__cfg) self.web_config: WebConfig = parse_web_config(self.__cfg['web']) def __getitem__(self, key): """Get item from configuration dict :param key: key :return: item """ return self.__cfg.get(key) @property def file_system_type(self) -> str: return self.__cfg.get('file_system', dict()).get('type', '') @property def bq(self) -> dict: return self.__cfg.get('bq', dict()) @property def bq_secret(self) -> dict: secret = self.bq.get('secret') if secret: return json.loads(secret) return dict() @property def bq_project(self) -> str: return self.bq.get('project', '') @property def github_token(self) -> str: return self.__cfg.get('github', dict()).get('token', '') @property def default_company(self) -> str: return self.__cfg.get('company', dict()).get('default', '') @property def spark_conf(self) -> Dict[str, str]: return self.__cfg.get('spark', dict())