syndicate/core/conf/validator.py (386 lines of code) (raw):

""" Copyright 2020 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import datetime import os import re from typing import Any from syndicate.core.conf.bucket_view import NAMED_S3_URI_PATTERN from syndicate.commons.log_helper import get_user_logger MIN_BUCKET_NAME_LEN = 3 MAX_BUCKET_NAME_LEN = 63 ALL_REGIONS = ['us-east-1', 'us-east-2', 'us-west-1', 'us-west-2', 'sa-east-1', 'ca-central-1', 'eu-west-1', 'eu-central-1', 'eu-west-2', 'eu-west-3', 'ap-northeast-1', 'ap-northeast-2', 'ap-east-1', 'ap-southeast-1', 'ap-southeast-2', 'ap-south-1', 'eu-north-1', 'eu-south-1', 'ap-northeast-3', 'ap-southeast-3', 'af-south-1'] REQUIRED = 'required' VALIDATOR = 'validator' PROJECT_PATH_CFG = 'project_path' ACCOUNT_ID_CFG = 'account_id' REGION_CFG = 'region' LAMBDAS_ALIASES_NAME_CFG = 'lambdas_alias_name' LOGS_EXPIRATION = 'logs_expiration' AWS_ACCESS_KEY_ID_CFG = 'aws_access_key_id' AWS_SECRET_ACCESS_KEY_CFG = 'aws_secret_access_key' AWS_SESSION_TOKEN_CFG = 'aws_session_token' DEPLOY_TARGET_BUCKET_CFG = 'deploy_target_bucket' PROJECTS_MAPPING_CFG = 'build_projects_mapping' RESOURCES_SUFFIX_CFG = 'resources_suffix' RESOURCES_PREFIX_CFG = 'resources_prefix' IAM_SUFFIX_CFG = 'iam_suffix' EXTENDED_PREFIX_MODE_CFG = 'extended_prefix_mode' EXTENDED_PREFIX_PATTERN = '^[a-z0-9-]+$' USE_TEMP_CREDS_CFG = 'use_temp_creds' SERIAL_NUMBER_CFG = 'serial_number' TEMP_AWS_ACCESS_KEY_ID_CFG = 'temp_aws_access_key_id' TEMP_AWS_SECRET_ACCESS_KEY_CFG = 'temp_aws_secret_access_key' TEMP_AWS_SESSION_TOKEN_CFG = 'temp_aws_session_token' EXPIRATION_CFG = 'expiration' SESSION_DURATION_CFG = 'session_duration' ACCESS_ROLE_CFG = 'access_role' IAM_PERMISSIONS_BOUNDARY_CFG = 'iam_permissions_boundary' LOCK_LIFETIME_MINUTES_CFG = 'lock_lifetime_minutes' TAGS_CFG = 'tags' PYTHON_LANGUAGE_NAME = 'python' NODEJS_LANGUAGE_NAME = 'nodejs' JAVA_LANGUAGE_NAME = 'java' DOTNET_LANGUAGE_NAME = 'dotnet' SWAGGER_UI_NAME = 'swagger_ui' APPSYNC_NAME = 'appsync' ALLOWED_RUNTIME_LANGUAGES = [PYTHON_LANGUAGE_NAME, JAVA_LANGUAGE_NAME, NODEJS_LANGUAGE_NAME, SWAGGER_UI_NAME] REQUIRED_PARAM_ERROR = 'The required key {} is missing' UNKNOWN_PARAM_MESSAGE = 'Unknown parameter(s) in the configuration file: {}' USER_LOG = get_user_logger() class ConfigValidator: def __init__(self, config_dict) -> None: self._config_dict = config_dict self._extended_prefix_mode = config_dict.get(EXTENDED_PREFIX_MODE_CFG) self._fields_validators_mapping = { PROJECT_PATH_CFG: { REQUIRED: True, VALIDATOR: self._validate_project_path}, ACCOUNT_ID_CFG: { REQUIRED: True, VALIDATOR: self._validate_account_id}, REGION_CFG: { REQUIRED: True, VALIDATOR: self._validate_region}, DEPLOY_TARGET_BUCKET_CFG: { REQUIRED: True, VALIDATOR: self._validate_bundle_bucket_name}, PROJECTS_MAPPING_CFG: { REQUIRED: False, VALIDATOR: self._validate_project_mapping}, AWS_ACCESS_KEY_ID_CFG: { REQUIRED: False, VALIDATOR: self._validate_aws_access_key}, AWS_SECRET_ACCESS_KEY_CFG: { REQUIRED: False, VALIDATOR: self._validate_aws_secret_access_key}, AWS_SESSION_TOKEN_CFG: { REQUIRED: False, VALIDATOR: self._validate_aws_session_token}, RESOURCES_PREFIX_CFG: { REQUIRED: False, VALIDATOR: self._validate_resource_prefix}, RESOURCES_SUFFIX_CFG: { REQUIRED: False, VALIDATOR: self._validate_resources_prefix_suffix}, IAM_SUFFIX_CFG: { REQUIRED: False, VALIDATOR: self._validate_resources_prefix_suffix }, EXTENDED_PREFIX_MODE_CFG: { REQUIRED: False, VALIDATOR: self._validate_extended_prefix_mode }, USE_TEMP_CREDS_CFG: { REQUIRED: False, VALIDATOR: self._validate_use_temp_creds }, SERIAL_NUMBER_CFG: { REQUIRED: False, VALIDATOR: self._validate_serial_number }, SESSION_DURATION_CFG: { REQUIRED: False, VALIDATOR: self._validate_session_duration, }, TEMP_AWS_SECRET_ACCESS_KEY_CFG: { REQUIRED: False, VALIDATOR: self._validate_aws_access_key }, TEMP_AWS_ACCESS_KEY_ID_CFG: { REQUIRED: False, VALIDATOR: self._validate_aws_secret_access_key }, TEMP_AWS_SESSION_TOKEN_CFG: { REQUIRED: False, VALIDATOR: self._validate_aws_session_token }, EXPIRATION_CFG: { REQUIRED: False, VALIDATOR: self._validate_expiration }, ACCESS_ROLE_CFG: { REQUIRED: False, VALIDATOR: self._validate_access_role }, TAGS_CFG: { REQUIRED: False, VALIDATOR: self.validate_tags }, IAM_PERMISSIONS_BOUNDARY_CFG: { REQUIRED: False, VALIDATOR: self._validate_iam_permissions_boundary }, LOCK_LIFETIME_MINUTES_CFG: { REQUIRED: False, VALIDATOR: self._validate_lock_lifetime_minutes } } def validate(self): error_messages = {} unknown_params = set(self._config_dict.keys()) - set( self._fields_validators_mapping.keys()) if unknown_params: USER_LOG.warn(UNKNOWN_PARAM_MESSAGE.format(unknown_params)) for key, validation_rules in self._fields_validators_mapping.items(): value = self._config_dict.get(key) is_required = validation_rules.get(REQUIRED) if is_required and not value: error_messages[key] = REQUIRED_PARAM_ERROR.format(key) continue if value: validator_func = validation_rules.get(VALIDATOR) validation_errors = validator_func(key, value) if validation_errors: error_messages[key] = validation_errors return error_messages def _validate_project_path(self, key, value): str_error = self._assert_value_is_str(key, value) if str_error: return [str_error] errors = [] if len(value) == 0: errors.append(f'{key} must not be empty') if not os.path.exists(value): errors.append(f'The path {value} specified in {key} must exist') return errors @staticmethod def _validate_account_id(key, value): errors = [] try: int(value) except TypeError: errors.append(f'{key} must be int, not {type(value)}') return errors if len(str(value)) != 12: errors.append(f'{key} must be a 12-digit number') return errors def _validate_region(self, key, value): str_error = self._assert_value_is_str(key, value) if str_error: return [str_error] if value not in ALL_REGIONS: return [ f'{key} value must be one of {ALL_REGIONS}, but is {value}' ] def _validate_bundle_bucket_name( self, key: str, value: str, ) -> list: str_error = self._assert_value_is_str(key=key, value=value) if str_error: return [str_error] errors = [] match = re.compile(NAMED_S3_URI_PATTERN).match(value) if not match: errors.append( f"The value '{value}' does not match the expected S3 URI format" ) return errors name = match.groupdict().get('name') if len(name) < MIN_BUCKET_NAME_LEN or len(name) > MAX_BUCKET_NAME_LEN: errors.append( f'The length of {key} must be between {MIN_BUCKET_NAME_LEN} and' f' {MAX_BUCKET_NAME_LEN} characters long but not {len(name)}' ) return errors def _validate_project_mapping(self, key, value): errors = [] if type(value) is not dict: errors.append(f'{key} must be type of dict') return errors project_path = self._config_dict.get(PROJECT_PATH_CFG) for key in value.keys(): if key not in ALLOWED_RUNTIME_LANGUAGES: errors.append(f'{key} is not supported to be built') continue for build_key, paths in value.items(): if not paths: errors.append(f'The path in {build_key} project ' f'mapping not specified') else: for path in paths: if not os.path.exists(os.path.join( project_path, path)): errors.append( f'The path in {key}:{build_key} project ' f'mapping does not exists: {path}') return errors def _validate_aws_access_key(self, key, value): str_error = self._assert_value_is_str(key=key, value=value) if str_error: return [str_error] if len(value) < 16 or len(value) > 128: return [ f'The length of {key} must be in a ' f'range between 16 and 128 characters'] def _validate_aws_secret_access_key(self, key, value): # the only constraint found str_error = self._assert_value_is_str(key=key, value=value) if str_error: return [str_error] def _validate_aws_session_token(self, key, value): str_error = self._assert_value_is_str(key=key, value=value) if str_error: return [str_error] def _validate_use_temp_creds(self, key, value): bool_error = self._assert_value_is_bool( key=key, value=value ) if bool_error: return [bool_error] def _validate_serial_number(self, key, value): str_error = self._assert_value_is_str(key=key, value=value) if str_error: return [str_error] def _validate_access_role(self, key, value): str_error = self._assert_value_is_str(key=key, value=value) if str_error: return [str_error] @staticmethod def validate_tags(key, value): errors = [] if not value: return errors if not isinstance(value, dict): errors.append(f'\'{key}\' param must be a dictionary but ' f'not a \'{type(value).__name__}\'') return errors if len(value) > 50: errors.append(f'Each resource can have up to 50 user created tags.' f' You have specified: {len(value)}') for tag_name, tag_value in value.items(): if not isinstance(tag_name, str): errors.append( f'The tag key \'{tag_name}\' has type ' f'"{type(tag_name).__name__}". Supported type is "string".') else: if tag_name.startswith('aws:'): errors.append(f'\'{tag_name}\': you can\'t create, edit ' f'or delete a tag that begins with the ' f'\'aws:\' prefix.') if not 1 <= len(tag_name) <= 128: errors.append(f'\'{tag_name}\': the tag key must be a ' f'minimum of 1 and a maximum of 128 Unicode ' f'characters') if not isinstance(tag_value, str): errors.append( f'The tag key \'{tag_name}\' value \'{tag_value}\'has type ' f'"{type(tag_value).__name__}". Supported type is "string".') else: if len(tag_value) > 256: errors.append(f'\'{tag_value}\': the tag value must be a ' f'minimum of 0 and a maximum of 256 Unicode ' f'characters') return errors @staticmethod def _validate_iam_permissions_boundary(key, value): errors = [] if not isinstance(value, str): return [f'\'{key}\' must have a string type'] return errors @staticmethod def _validate_expiration(key, value): if not isinstance(value, datetime.datetime): return [f'\'{key}\' must be a valid ISO 8601 format string'] return [] @staticmethod def _validate_resources_prefix_suffix(key, value): str_error = ConfigValidator._assert_value_is_str(key=key, value=value) if str_error: return [str_error] if len(value) > 5: return [ f'The length of {key} must be less or equal to 5 character'] def _validate_resource_prefix(self, key, value): if self._extended_prefix_mode: return self._validate_resources_prefix_extended_mode(key, value) return self._validate_resources_prefix_suffix(key, value) @staticmethod def _validate_extended_prefix_mode(key, value): bool_error = ConfigValidator._assert_value_is_bool( key=key, value=value ) if bool_error: return [bool_error] @staticmethod def _validate_resources_prefix_extended_mode(key, value): result = [] str_error = ConfigValidator._assert_value_is_str(key=key, value=value) if str_error: result.append(str_error) if len(value) > 14: result.append(f'The length of {key} must be less or equal to 14 ' f'character') if '--' in value: result.append(f'The {key} must not contain two consecutive ' f'hyphens') if not value[0].isalpha(): result.append(f'The first character of the {key} must be a letter') if not re.match(EXTENDED_PREFIX_PATTERN, value): result.append(f'The {key} must contain only lowercase letters, ' f'numbers, and hyphens') return result @staticmethod def validate_prefix_suffix(key, value): result = ConfigValidator._validate_resources_prefix_suffix(key, value) if result: return result[0] @staticmethod def validate_extended_prefix(key, value): result = ConfigValidator._validate_resources_prefix_extended_mode( key, value) if result: return result @staticmethod def _validate_session_duration(key, value): if not isinstance(value, int): return [f'\'{key}\' must a an integer'] if value < 900: return [f'\'{key}\' must begin from 900 seconds'] @staticmethod def _validate_lock_lifetime_minutes(key, value): if not isinstance(value, int): return [f'\'{key}\' must a an integer'] if not 0 <= value <= 300: return [f'\'{key}\' value must be between 0 and 300 minutes'] @staticmethod def _assert_value_is_str( key: str, value: Any, ) -> str | None: if type(value) is not str: return f'{key} must be type of string' @staticmethod def _assert_value_is_bool(key, value): if type(value) is not bool: return f'{key} must be type of bool'