syndicate/core/build/validator/mapping.py (51 lines of code) (raw):

""" Copyright 2018 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ from syndicate.commons.log_helper import get_logger from syndicate.core.build.validator.batch_compenv_validator import validate_batch_compenv from syndicate.core.build.validator.dynamodb_validator import \ validate_dynamodb, validate_dax_cluster from syndicate.core.build.validator.batch_jobdef_validator import validate_batch_jobdef from syndicate.core.build.validator.lambda_validator import validate_lambda from syndicate.core.build.validator.ec2_launch_template_validator import \ validate_launch_template from syndicate.core.constants import \ (LAMBDA_CONFIG_FILE_NAME, LAMBDA_TYPE, DYNAMO_TABLE_TYPE, BATCH_COMPENV_TYPE, BATCH_JOBDEF_TYPE, DAX_CLUSTER_TYPE, EC2_LAUNCH_TEMPLATE_TYPE, RESOURCE_LIST) ALL_TYPES = 'all_types' _LOG = get_logger(__name__) def common_validate(resource_name, resource_meta, all_meta): dependencies = resource_meta.get('dependencies') if dependencies: for dependency in dependencies: errors = [] if not dependency.get('resource_name'): errors.append( f"There is no 'resource_name' in resource " f"'{resource_name}' dependency {dependency}") elif dependency.get('resource_name') not in list(all_meta.keys()): errors.append( f"The resource '{resource_name}' depends on resource " f"'{dependency.get('resource_name')}' that is not a part " f"of the project. Please double-check the project " f"resources description add it to the project or remove " f"it from the resource '{resource_name}' dependencies.") if not dependency.get('resource_type'): errors.append( f"There is no 'resource_type' in resource " f"'{resource_name}' dependency {dependency}") elif dependency.get('resource_type') not in RESOURCE_LIST: errors.append( f"Unsupported resource type " f"'{dependency.get('resource_type')}' found in " f"the resource '{resource_name}' dependency " f"'{dependency}'.") if errors: raise AssertionError(str(errors)) # validation customization VALIDATOR_BY_TYPE_MAPPING = { ALL_TYPES: common_validate, DYNAMO_TABLE_TYPE: validate_dynamodb, DAX_CLUSTER_TYPE: validate_dax_cluster, BATCH_COMPENV_TYPE: validate_batch_compenv, BATCH_JOBDEF_TYPE: validate_batch_jobdef, LAMBDA_TYPE: validate_lambda, EC2_LAUNCH_TEMPLATE_TYPE: validate_launch_template }