syndicate/core/conf/processor.py (326 lines of code) (raw):

""" Copyright 2018 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import os from configparser import ConfigParser from datetime import datetime from typing import Union import yaml from syndicate.commons.log_helper import get_logger from syndicate.core.conf.bucket_view import \ AbstractBucketView, AbstractViewDigest from syndicate.core.conf.validator import \ (PROJECT_PATH_CFG, REGION_CFG, DEPLOY_TARGET_BUCKET_CFG, ACCOUNT_ID_CFG, PROJECTS_MAPPING_CFG, AWS_ACCESS_KEY_ID_CFG, RESOURCES_PREFIX_CFG, RESOURCES_SUFFIX_CFG, AWS_SECRET_ACCESS_KEY_CFG, ConfigValidator, USE_TEMP_CREDS_CFG, SERIAL_NUMBER_CFG, TEMP_AWS_ACCESS_KEY_ID_CFG, TEMP_AWS_SECRET_ACCESS_KEY_CFG, TEMP_AWS_SESSION_TOKEN_CFG, EXPIRATION_CFG, TAGS_CFG, IAM_PERMISSIONS_BOUNDARY_CFG, LAMBDAS_ALIASES_NAME_CFG, AWS_SESSION_TOKEN_CFG, EXTENDED_PREFIX_MODE_CFG, LOCK_LIFETIME_MINUTES_CFG) from syndicate.core.constants import (DEFAULT_SEP, IAM_POLICY, IAM_ROLE, S3_BUCKET_TYPE) CONFIG_FILE_NAME = 'syndicate.yml' ALIASES_FILE_NAME = 'syndicate_aliases.yml' LEGACY_CONFIG_FILE_NAME = 'sdct.conf' LEGACY_ALIASES_FILE_NAME = 'sdct_aliases.conf' _LOG = get_logger(__name__) GLOBAL_AWS_SERVICES = {IAM_ROLE, IAM_POLICY, S3_BUCKET_TYPE} DEFAULT_LOCK_TIME_IN_MINUTES = 20 class ConfigHolder: def __init__(self, dir_path): con_path_yml = os.path.join(dir_path, CONFIG_FILE_NAME) con_path_yaml = os.path.join(dir_path, CONFIG_FILE_NAME.replace('yml', 'yaml')) con_path = con_path_yml if \ os.path.exists(con_path_yml) else con_path_yaml self._config_path = con_path if os.path.isfile(con_path): self._init_yaml_config(dir_path=dir_path, con_path=con_path) else: self._init_conf_config(dir_path=dir_path) def _assert_no_errors(self, errors: list): if errors: raise AssertionError(f'The following error occurred ' f'while {self._config_path} ' f'parsing: {errors}') def _init_yaml_config(self, dir_path, con_path): config_content = load_yaml_file_content(file_path=con_path) if config_content: validator = ConfigValidator(config_content) errors = validator.validate() self._assert_no_errors(errors) self._config_dict = config_content aliases_path_yml = os.path.join(dir_path, ALIASES_FILE_NAME) aliases_path_yaml = os.path.join( dir_path, ALIASES_FILE_NAME.replace('yml', 'yaml')) aliases_path = aliases_path_yml \ if os.path.exists(aliases_path_yml) else aliases_path_yaml aliases_content = load_yaml_file_content(file_path=aliases_path) self._aliases = aliases_content self._aliases.update(self.default_aliases) def _init_conf_config(self, dir_path): con_path = os.path.join(dir_path, LEGACY_CONFIG_FILE_NAME) if not os.path.isfile(con_path): raise AssertionError( 'sdct.conf does not exist inside %s folder' % dir_path) self._config_path = con_path self._config_dict = load_conf_file_content(self._config_path) validator = ConfigValidator(self._config_dict) errors = validator.validate() self._assert_no_errors(errors) alias_path = os.path.join(dir_path, LEGACY_ALIASES_FILE_NAME) if not os.path.exists(alias_path): _LOG.warn('sdct_aliases.conf does not exist ' 'inside %s folder' % dir_path) self._aliases = load_conf_file_content(alias_path) self._aliases.update(self.default_aliases) def set_temp_credentials_to_config(self, temp_aws_access_key_id, temp_aws_secret_access_key, temp_aws_session_token, expiration): content_to_update = { TEMP_AWS_ACCESS_KEY_ID_CFG: temp_aws_access_key_id, TEMP_AWS_SECRET_ACCESS_KEY_CFG: temp_aws_secret_access_key, TEMP_AWS_SESSION_TOKEN_CFG: temp_aws_session_token, EXPIRATION_CFG: expiration } update_file_content( file_path=self._config_path, content=content_to_update ) def _resolve_variable(self, variable_name): return self._config_dict.get(variable_name) def _resolve_aliases(self, variable_name): return self._aliases.get(variable_name) def _prepare_bucket_view(self) -> Union[None, AbstractBucketView]: """ Prepares assigned bucket view instance, by providing the raw config payload. Under circumstances of an error, deletes the previously installed view, which defaults to using the raw format. :return: [None, AbstractBucketView] """ view = self.deploy_target_bucket_view raw = self._resolve_variable(DEPLOY_TARGET_BUCKET_CFG) try: view.raw = raw _LOG.info(f'Viewing complement, {view.__class__.__name__},' ' has been found, setting up the raw data.') return view except AttributeError: _LOG.warn('No viewing complement has been found.') except AbstractBucketView.BucketViewRuntimeError: _LOG.warn('Viewing complement set-up has failed.') del self.deploy_target_bucket_view return None def _resolve_bucket_view_attribute(self, attribute_name: str, default=None): """ Retrieves bucket view value respectively to a provided attribute name. """ if not isinstance(attribute_name, str): raise KeyError('Name of an attribute must be a string.') view = self.deploy_target_bucket_view if view and not view.raw: view = self._prepare_bucket_view() return getattr(view, attribute_name, default) @property def default_aliases(self): return { ACCOUNT_ID_CFG: self.account_id, REGION_CFG: self.region } @property def project_path(self): return path_resolver(self._resolve_variable(PROJECT_PATH_CFG)) @property def account_id(self): return str(self._resolve_variable(ACCOUNT_ID_CFG)) @property def lambdas_alias_name(self): return str(self._resolve_aliases(LAMBDAS_ALIASES_NAME_CFG)) @property def access_role(self): return self._resolve_variable('access_role') @property def session_duration(self): duration = self._resolve_variable('session_duration') if duration: return int(self._resolve_variable('session_duration')) @property def aws_access_key_id(self): return self._resolve_variable(AWS_ACCESS_KEY_ID_CFG) @property def aws_session_token(self): return self._resolve_variable(AWS_SESSION_TOKEN_CFG) @property def aws_secret_access_key(self): return self._resolve_variable(AWS_SECRET_ACCESS_KEY_CFG) @property def region(self): return self._resolve_variable(REGION_CFG) @property def deploy_target_bucket(self) -> str: return self._resolve_bucket_view_attribute( 'name', self._resolve_variable(DEPLOY_TARGET_BUCKET_CFG) ) @property def deploy_target_bucket_key_compound(self) -> str: return self._resolve_bucket_view_attribute('key', '') @property def deploy_target_bucket_view(self) -> Union[AbstractBucketView, None]: return getattr(self, '_deploy_target_bucket_view', None) @deploy_target_bucket_view.setter def deploy_target_bucket_view(self, view: AbstractBucketView): if not isinstance(view, AbstractBucketView): _LOG.error('Bucket view couldn\'t have been set, ' 'due to improper type.') elif not isinstance(view.digest, AbstractViewDigest): _LOG.error('Bucket view couldn\'t have been set,' ' due to unassigned digest-parser property.') else: setattr(self, '_deploy_target_bucket_view', view) @deploy_target_bucket_view.deleter def deploy_target_bucket_view(self): delattr(self, '_deploy_target_bucket_view') @property def iam_permissions_boundary(self): return self._resolve_variable(IAM_PERMISSIONS_BOUNDARY_CFG) # mapping build tool : paths to project @property def build_projects_mapping(self): mapping_value = self._resolve_variable(PROJECTS_MAPPING_CFG) if type(mapping_value) == dict: return mapping_value if mapping_value: mapping_dict = {} for i in mapping_value.split(';'): key = i.split(':')[0] value = i.split(':')[1] list_values = mapping_dict.get(key) if list_values: list_values.append(path_resolver(value)) else: mapping_dict[key] = [path_resolver(value)] return mapping_dict @property def resources_prefix(self): prefix = self._resolve_variable(RESOURCES_PREFIX_CFG) if prefix is None: return '' else: return prefix @property def resources_suffix(self): suffix = self._resolve_variable(RESOURCES_SUFFIX_CFG) if suffix is None: return '' else: return suffix @property def iam_suffix(self): """ Optional property. It will be included as a ending of names for iam_roles. :return: """ return self._resolve_variable('iam_suffix') @property def extended_prefix_mode(self): prefix_mode = self._resolve_variable(EXTENDED_PREFIX_MODE_CFG) return self._resolve_bool_param(prefix_mode) @property def aliases(self): return self._aliases @property def use_temp_creds(self): var = self._resolve_variable(USE_TEMP_CREDS_CFG) return self._resolve_bool_param(var) @property def serial_number(self): return self._resolve_variable(SERIAL_NUMBER_CFG) @property def temp_aws_access_key_id(self): return self._resolve_variable(TEMP_AWS_ACCESS_KEY_ID_CFG) @property def temp_aws_secret_access_key(self): return self._resolve_variable(TEMP_AWS_SECRET_ACCESS_KEY_CFG) @property def temp_aws_session_token(self): return self._resolve_variable(TEMP_AWS_SESSION_TOKEN_CFG) @property def expiration(self): return self._resolve_variable(EXPIRATION_CFG) @property def tags(self) -> dict: tags = self._resolve_variable(TAGS_CFG) or {} tags = {k: str(v) for k, v in tags.items()} return tags @property def lock_lifetime_minutes(self) -> int: return self._resolve_variable(LOCK_LIFETIME_MINUTES_CFG) or \ DEFAULT_LOCK_TIME_IN_MINUTES def resolve_alias(self, name): if self._aliases.get(name): return self._aliases[name] @staticmethod def _resolve_bool_param(parameter): if isinstance(parameter, bool): return parameter elif isinstance(parameter, str): return parameter.lower() in ("yes", "true", "t", "1") return False def path_resolver(path): return path.replace('\\', DEFAULT_SEP).replace('//', DEFAULT_SEP) def str_to_bool(val): if isinstance(val, str): if val.lower() == 'true': return True elif val.lower() == 'false': return False return val def str_to_datetime(val): if isinstance(val, str): if ' ' in val: val = val.replace(' ', 'T') return datetime.fromisoformat(val) return val def add_default_section(file_path): with open(file_path, 'r+') as f: lines = f.readlines() lines = [line for line in lines if line.strip() != ''] first_line = lines[0] f.seek(0) if '[default]' not in first_line: rest_of_file = f.read() f.seek(0) f.write('[default]\n' + rest_of_file) def load_yaml_file_content(file_path): if not os.path.isfile(file_path): raise AssertionError(f'There is no file by path: {file_path}') with open(file_path, 'r') as yaml_file: return yaml.load(yaml_file, Loader=yaml.FullLoader) def load_conf_file_content(file_path): if not os.path.isfile(file_path): raise AssertionError(f'There is no file by path: {file_path}') add_default_section(file_path) config = ConfigParser() config.read(file_path) config_dict = {} for section in config.sections(): if section == 'tags': config_dict[section] = dict(config[section]) else: section_dict = { k: str_to_bool(v) if k != 'expiration' else str_to_datetime(v) for k, v in dict(config[section]).items()} config_dict.update(section_dict) return config_dict def update_file_content(file_path, content): if file_path.endswith('.yaml') or file_path.endswith('.yml'): update_yaml_file_content(file_path=file_path, content=content) elif file_path.endswith('.conf'): update_conf_file_content(file_path=file_path, content=content) def update_yaml_file_content(file_path, content): file_content = load_yaml_file_content(file_path=file_path) file_content.update(content) with open(file_path, 'w') as yaml_file: yaml.dump(file_content, yaml_file, default_flow_style=False) def update_conf_file_content(file_path, content): file_content = load_conf_file_content(file_path) file_content.update(content) config = ConfigParser() for key, val in file_content.items(): if type(val) is dict: config.add_section(key) for sub_key, sub_val in val.items(): config.set(key, sub_key, str(sub_val)) else: if not config.has_section('default'): config.add_section('default') config.set('default', key, str(val)) with open(file_path, 'w') as f: config.write(f)