syndicate/core/build/meta_processor.py (534 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import shutil
from json import load
from typing import Any
from urllib.parse import urlparse
from syndicate.commons.log_helper import get_logger, get_user_logger
from syndicate.core.build.helper import (build_py_package_name,
resolve_bundle_directory)
from syndicate.core.build.validator.mapping import (VALIDATOR_BY_TYPE_MAPPING,
ALL_TYPES)
from syndicate.core.conf.processor import GLOBAL_AWS_SERVICES
from syndicate.core.constants import (API_GATEWAY_TYPE, ARTIFACTS_FOLDER,
BUILD_META_FILE_NAME, EBS_TYPE,
LAMBDA_CONFIG_FILE_NAME, LAMBDA_TYPE,
RESOURCES_FILE_NAME, RESOURCE_LIST,
IAM_ROLE, LAMBDA_LAYER_TYPE,
S3_PATH_NAME, APPSYNC_CONFIG_FILE_NAME,
LAMBDA_LAYER_CONFIG_FILE_NAME,
WEB_SOCKET_API_GATEWAY_TYPE,
OAS_V3_FILE_NAME,
API_GATEWAY_OAS_V3_TYPE, SWAGGER_UI_TYPE,
SWAGGER_UI_CONFIG_FILE_NAME,
TAGS_RESOURCE_TYPE_CONFIG,
MVN_TARGET_DIR_NAME)
from syndicate.core.helper import (build_path, prettify_json,
resolve_aliases_for_string,
write_content_to_file, validate_tags)
from syndicate.core.resources.helper import resolve_dynamic_identifier
DEFAULT_IAM_SUFFIX_LENGTH = 5
NAME_RESOLVING_BLACKLISTED_KEYS = ['prefix', 'suffix', 'resource_type', 'principal_service', 'integration_type',
'authorization_type']
_LOG = get_logger(__name__)
USER_LOG = get_user_logger()
def validate_deployment_packages(bundle_path, meta_resources):
package_paths = artifact_paths(meta_resources)
nonexistent_packages = []
for package in package_paths:
package_path = build_path(bundle_path, package)
if not os.path.exists(package_path):
nonexistent_packages.append(package_path)
if nonexistent_packages:
raise AssertionError('Bundle is not properly configured.'
' Nonexistent deployment packages: '
'{0}'.format(prettify_json(nonexistent_packages)))
def artifact_paths(meta_resources):
return [i for i in
[_retrieve_package(v) for v in list(meta_resources.values())] if i]
def _retrieve_package(meta):
s3_path = meta.get(S3_PATH_NAME)
if s3_path:
return s3_path
def _check_duplicated_resources(initial_meta_dict, additional_item_name,
additional_item):
""" Match two meta dicts (overall and separated) for duplicates.
:type initial_meta_dict: dict
:type additional_item_name: str
:type additional_item: dict
"""
if additional_item_name in initial_meta_dict:
additional_type = additional_item['resource_type']
initial_item = initial_meta_dict.get(additional_item_name)
if not initial_item:
return
initial_type = initial_item['resource_type']
if additional_type == initial_type and initial_type in \
{API_GATEWAY_TYPE, WEB_SOCKET_API_GATEWAY_TYPE}:
# check if APIs have same resources
for each in list(initial_item['resources'].keys()):
if each in list(additional_item['resources'].keys()):
raise AssertionError(
"API '{0}' has duplicated resource '{1}'! Please, "
"change name of one resource or remove one.".format(
additional_item_name, each))
# check if APIs have duplicated cluster configurations
for config in ['cluster_cache_configuration',
'cluster_throttling_configuration']:
initial_config = initial_item.get(config)
additional_config = additional_item.get(config)
if initial_config and additional_config:
raise AssertionError(
"API '{0}' has duplicated {1}. Please, remove one "
"configuration.".format(additional_item_name, config)
)
if initial_config:
additional_item[config] = initial_config
# handle responses
initial_responses = initial_item.get(
'api_method_responses')
additional_responses = additional_item.get(
'api_method_responses')
if initial_responses and additional_responses:
raise AssertionError(
"API '{0}' has duplicated api method responses "
"configurations. Please, remove one "
"api method responses configuration.".format(
additional_item_name)
)
if initial_responses:
additional_item[
'api_method_responses'] = initial_responses
# handle integration responses
initial_integration_resp = initial_item.get(
'api_method_integration_responses')
additional_integration_resp = additional_item.get(
'api_method_integration_responses')
if initial_integration_resp and additional_integration_resp:
raise AssertionError(
"API '{0}' has duplicated api method integration "
"responses configurations. Please, remove one "
"api method integration responses configuration.".format(
additional_item_name)
)
if initial_integration_resp:
additional_item[
'api_method_integration_responses'] = initial_integration_resp
# join items dependencies
dependencies_dict = {each['resource_name']: each
for each in
additional_item.get('dependencies') or []}
for each in initial_item.get('dependencies') or []:
if each['resource_name'] not in dependencies_dict:
additional_item['dependencies'].append(each)
# join items resources
additional_item['resources'].update(initial_item['resources'])
# return aggregated API description
init_deploy_stage = initial_item.get('deploy_stage')
if init_deploy_stage:
additional_item['deploy_stage'] = init_deploy_stage
init_compression = initial_item.get("minimum_compression_size")
if init_compression:
additional_comp_size = \
additional_item.get('minimum_compression_size')
if additional_comp_size:
_LOG.warn(f"Found 'minimum_compression_size': "
f"{init_compression} inside root "
f"deployment_resources. The value "
f"'{additional_comp_size}' from: "
f"{additional_item} will be overwritten")
additional_item['minimum_compression_size'] = init_compression
# join authorizers
initial_authorizers = initial_item.get('authorizers') or {}
additional_authorizers = additional_item.get('authorizers') or {}
additional_item['authorizers'] = {**initial_authorizers,
**additional_authorizers}
# join models
initial_models = initial_item.get('models') or {}
additional_models = additional_item.get('models') or {}
additional_item['models'] = {**initial_models, **additional_models}
# policy statement singleton
_pst = initial_item.get('policy_statement_singleton')
if 'policy_statement_singleton' not in additional_item and _pst:
additional_item['policy_statement_singleton'] = _pst
additional_item['route_selection_expression'] = initial_item.get(
'route_selection_expression')
additional_item = _merge_api_gw_list_typed_configurations(
initial_item,
additional_item,
['binary_media_types', 'apply_changes']
)
return additional_item
else:
initial_item_type = initial_item.get("resource_type")
additional_item_type = additional_item.get("resource_type")
raise AssertionError(
f"Two resources with equal names were found! "
f"Name: '{additional_item_name}', first resource type: "
f"'{initial_item_type}', second resource type: "
f"'{additional_item_type}'. \nPlease, rename one of them!"
)
def _merge_api_gw_list_typed_configurations(initial_resource,
additional_resource,
property_names_list):
for property_name in property_names_list:
initial_property_value = initial_resource.get(property_name, [])
additional_resource_value = additional_resource.get(property_name, [])
additional_resource[
property_name] = initial_property_value + additional_resource_value
return additional_resource
def _populate_s3_path_python_node_dotnet(meta, bundle_name):
name = meta.get('name')
version = meta.get('version')
prefix = meta.pop('prefix', None)
suffix = meta.pop('suffix', None)
if not name or not version:
raise AssertionError('Lambda config must contain name and version. '
'Existing configuration'
': {0}'.format(prettify_json(meta)))
else:
if prefix:
name = name[len(prefix):]
if suffix:
name = name[:-len(suffix)]
meta[S3_PATH_NAME] = build_path(bundle_name,
build_py_package_name(name, version))
def _populate_s3_path_java(meta, bundle_name):
deployment_package = meta.get('deployment_package')
if not deployment_package:
raise AssertionError('Lambda config must contain deployment_package. '
'Existing configuration'
': {0}'.format(prettify_json(meta)))
else:
meta[S3_PATH_NAME] = build_path(bundle_name, deployment_package)
def _populate_s3_path_lambda(meta, bundle_name):
runtime = meta.get('runtime')
if not runtime:
raise AssertionError(
'Lambda config must contain runtime. '
'Existing configuration: {0}'.format(prettify_json(meta)))
resolver_func = RUNTIME_PATH_RESOLVER.get(runtime.lower())
if resolver_func:
resolver_func(meta, bundle_name)
else:
raise AssertionError(
'Specified runtime {0} in {1} is not supported. '
'Supported runtimes: {2}'.format(
runtime.lower(), meta.get('name'),
list(RUNTIME_PATH_RESOLVER.keys())))
def _populate_s3_path_lambda_layer(meta, bundle_name):
deployment_package = meta.get('deployment_package')
if not deployment_package:
raise AssertionError(
'Lambda Layer config must contain deployment_package. '
'Existing configuration'
': {0}'.format(prettify_json(meta)))
else:
meta[S3_PATH_NAME] = build_path(bundle_name, deployment_package)
def _populate_s3_path_ebs(meta, bundle_name):
deployment_package = meta.get('deployment_package')
if not deployment_package:
raise AssertionError('Beanstalk_app config must contain '
'deployment_package. Existing configuration'
': {0}'.format(prettify_json(meta)))
else:
meta[S3_PATH_NAME] = build_path(bundle_name, deployment_package)
def _populate_s3_path_swagger_ui(meta, bundle_name):
deployment_package = meta.get('deployment_package')
if not deployment_package:
raise AssertionError('Swagger UI config must contain '
'deployment_package. Existing configuration'
': {0}'.format(prettify_json(meta)))
else:
meta[S3_PATH_NAME] = build_path(bundle_name, deployment_package)
def populate_s3_paths(overall_meta, bundle_name):
for name, meta in overall_meta.items():
resource_type = meta.get('resource_type')
mapping_func = S3_PATH_MAPPING.get(resource_type)
if mapping_func:
mapping_func(meta, bundle_name)
return overall_meta
def extract_deploy_stage_from_openapi_spec(openapi_spec: dict) -> str:
"""
Extract the first path segment from the server URL in an API specification.
If no server URL is found, or there is no path segment, raise an exception.
"""
servers = openapi_spec.get('servers', [])
if not servers:
raise ValueError("No server information found in API specification.")
server_url = servers[0].get('url', '')
variables = servers[0].get('variables', {})
# Substitute variables in the URL template with their default values, if any
for var_name, var_details in variables.items():
default_value = var_details.get('default', '')
server_url = server_url.replace(f'{{{var_name}}}', default_value)
# Extract the first path segment
path_segments = [segment for segment in
urlparse(server_url).path.split('/')
if segment]
if not path_segments:
raise ValueError("No path segments found in server URL.")
return path_segments[0]
RUNTIME_PATH_RESOLVER = {
'python3.8': _populate_s3_path_python_node_dotnet,
'python3.9': _populate_s3_path_python_node_dotnet,
'python3.10': _populate_s3_path_python_node_dotnet,
'python3.11': _populate_s3_path_python_node_dotnet,
'python3.12': _populate_s3_path_python_node_dotnet,
'java11': _populate_s3_path_java,
'java17': _populate_s3_path_java,
'java21': _populate_s3_path_java,
'nodejs16.x': _populate_s3_path_python_node_dotnet,
'nodejs18.x': _populate_s3_path_python_node_dotnet,
'nodejs20.x': _populate_s3_path_python_node_dotnet,
'dotnet8': _populate_s3_path_python_node_dotnet
}
S3_PATH_MAPPING = {
LAMBDA_TYPE: _populate_s3_path_lambda,
EBS_TYPE: _populate_s3_path_ebs,
LAMBDA_LAYER_TYPE: _populate_s3_path_lambda_layer,
SWAGGER_UI_TYPE: _populate_s3_path_swagger_ui
}
def _look_for_configs(nested_files: list[str], resources_meta: dict[str, Any],
path: str, bundle_name: str) -> None:
""" Look for all config files in project structure. Read content and add
all meta to overall meta if there is no duplicates. If duplicates found -
raise AssertionError.
:param nested_files: A list of files in the project
:param resources_meta: A dictionary of resources metadata
:param path: A string path to the project
:param bundle_name: A string name of the bundle
"""
for each in nested_files:
if each.endswith(LAMBDA_CONFIG_FILE_NAME) or \
each.endswith(LAMBDA_LAYER_CONFIG_FILE_NAME) or \
each.endswith(SWAGGER_UI_CONFIG_FILE_NAME) or \
each.endswith(APPSYNC_CONFIG_FILE_NAME):
resource_config_path = os.path.join(path, each)
_LOG.debug(f'Processing file: {resource_config_path}')
with open(resource_config_path) as data_file:
resource_conf = load(data_file)
resource_name = resource_conf['name']
resource_type = resource_conf['resource_type']
_LOG.debug(f'Found {resource_type}: {resource_name}')
res = _check_duplicated_resources(resources_meta, resource_name,
resource_conf)
if res:
resource_conf = res
resources_meta[resource_name] = resource_conf
if each.endswith(OAS_V3_FILE_NAME):
openapi_spec_path = os.path.join(path, each)
_LOG.debug(f'Processing file: {openapi_spec_path}')
with open(openapi_spec_path) as data_file:
openapi_spec = load(data_file)
api_gateway_name = openapi_spec['info']['title']
_LOG.debug(f'Found API Gateway: {api_gateway_name}')
deploy_stage = extract_deploy_stage_from_openapi_spec(openapi_spec)
resource = {
"definition": openapi_spec,
"resource_type": API_GATEWAY_OAS_V3_TYPE,
"deploy_stage": deploy_stage,
}
tags = openapi_spec.get("x-syndicate-openapi-tags")
if tags:
resource["tags"] = tags
res = _check_duplicated_resources(
resources_meta, api_gateway_name, resource
)
if res:
resource = res
resources_meta[api_gateway_name] = resource
if each == RESOURCES_FILE_NAME:
additional_config_path = os.path.join(path, RESOURCES_FILE_NAME)
_LOG.debug('Processing file: {0}'.format(additional_config_path))
with open(additional_config_path, encoding='utf-8') as json_file:
deployment_resources = load(json_file)
for resource_name in deployment_resources:
_LOG.debug('Found resource ' + resource_name)
resource = deployment_resources[resource_name]
# check if resource type exists in deployment framework and
# has resource_type field
try:
resource_type = resource['resource_type']
except KeyError:
error_message = \
f"There is no 'resource_type' in {resource_name}"
_LOG.error(error_message)
raise AssertionError(error_message)
if resource_type not in RESOURCE_LIST:
error_message = (
f'Unsupported resource type found: "{resource_type}". '
f'Please double-check the correctness of the specified '
f'resource type. To add a new resource type please '
f'request the support team.')
_LOG.error(error_message)
raise KeyError(error_message)
res = _check_duplicated_resources(resources_meta,
resource_name, resource)
if res:
resource = res
resources_meta[resource_name] = resource
# todo validate all required configs
def create_resource_json(project_path: str, bundle_name: str) -> dict[
str, Any]:
""" Create resource catalog json with all resource metadata in project.
:param project_path: path to the project
:type bundle_name: name of the bucket subdir
"""
resources_meta = {}
# Walking through every folder in the project
for path, _, nested_items in os.walk(project_path):
# there is no duplicates in single json, because json is a dict
_look_for_configs(nested_items, resources_meta, path, bundle_name)
meta_for_validation = _resolve_aliases(resources_meta)
# check if all dependencies were described
common_validator = VALIDATOR_BY_TYPE_MAPPING[ALL_TYPES]
for name, meta in meta_for_validation.items():
common_validator(resource_name=name,
resource_meta=meta, all_meta=meta_for_validation)
resource_type = meta['resource_type']
type_validator = VALIDATOR_BY_TYPE_MAPPING.get(resource_type)
if type_validator:
type_validator(name, meta)
return resources_meta
def _resolve_names_in_meta(resources_dict, old_value, new_value):
if isinstance(resources_dict, dict):
for k, v in resources_dict.items():
if k in NAME_RESOLVING_BLACKLISTED_KEYS:
continue
if isinstance(v, str) and old_value == v:
resources_dict[k] = v.replace(old_value, new_value)
elif isinstance(v, str) and old_value in v and v.startswith('arn'):
resources_dict[k] = _resolve_name_in_arn(v, old_value, new_value)
else:
_resolve_names_in_meta(v, old_value, new_value)
elif isinstance(resources_dict, list):
for item in resources_dict:
if isinstance(item, dict):
_resolve_names_in_meta(item, old_value, new_value)
elif (isinstance(item, str) and old_value in item and
item.startswith('arn')):
index = resources_dict.index(item)
resources_dict[index] = _resolve_name_in_arn(item, old_value, new_value)
elif isinstance(item, str):
if item == old_value:
index = resources_dict.index(old_value)
del resources_dict[index]
resources_dict.append(new_value)
def _resolve_name_in_arn(arn, old_value, new_value):
arn_parts = arn.split(':')
for part in arn_parts:
new_part = None
if part == old_value:
new_part = new_value
elif part.startswith(old_value) and part[len(old_value)] == '/':
new_part = part.replace(old_value, new_value)
if new_part:
index = arn_parts.index(part)
arn_parts[index] = new_part
return ':'.join(arn_parts)
def create_meta(project_path: str, bundle_name: str) -> None:
# create overall meta.json with all resource meta info
meta_path = build_path(project_path, ARTIFACTS_FOLDER,
bundle_name)
_LOG.info(f'Bundle path: {meta_path}')
overall_meta = create_resource_json(project_path=project_path,
bundle_name=bundle_name)
bundle_dir = resolve_bundle_directory(bundle_name=bundle_name)
write_content_to_file(bundle_dir, BUILD_META_FILE_NAME, overall_meta)
# remove Java runtime temporary files
target_path = build_path(project_path, MVN_TARGET_DIR_NAME)
if os.path.exists(target_path):
_LOG.debug(f'Removing temporary directory \'{target_path}\'')
shutil.rmtree(target_path, ignore_errors=True)
def resolve_meta(overall_meta):
from syndicate.core import CONFIG
iam_suffix = CONFIG.iam_suffix
extended_prefix_mode = CONFIG.extended_prefix_mode
overall_meta = _resolve_aliases(overall_meta)
_LOG.debug('Resolved meta was created')
_LOG.debug(prettify_json(overall_meta))
_resolve_permissions_boundary(overall_meta)
_LOG.debug('Permissions boundary were resolved')
# get dict with resolved prefix and suffix in meta resources
# key: current_name, value: resolved_name
resolved_names = {}
for name, res_meta in overall_meta.items():
resource_type = res_meta['resource_type']
if resource_type in GLOBAL_AWS_SERVICES or extended_prefix_mode:
resolved_name = resolve_resource_name(
resource_name=name,
prefix=CONFIG.resources_prefix,
suffix=CONFIG.resources_suffix)
if resource_type == LAMBDA_TYPE:
res_meta['prefix'] = CONFIG.resources_prefix
res_meta['suffix'] = CONFIG.resources_suffix
# add iam_suffix to IAM role only if it is specified in config file
if resource_type == IAM_ROLE and iam_suffix:
resolved_name = resolved_name + iam_suffix
if name != resolved_name:
resolved_names[name] = resolved_name
_LOG.debug('Going to resolve names in meta')
_LOG.debug('Resolved names mapping: {0}'.format(str(resolved_names)))
for current_name, resolved_name in resolved_names.items():
overall_meta[resolved_name] = overall_meta.pop(current_name)
if not all([current_name, resolved_name]):
continue
_resolve_names_in_meta(overall_meta, current_name, resolved_name)
return overall_meta
def resolve_tags(meta: dict) -> None:
_LOG.debug('Going to resolve resources tags.')
from syndicate.core import CONFIG
common_tags = CONFIG.tags
for res_name, res_meta in meta.items():
res_tags = res_meta.get('tags', {})
_LOG.debug(f'The resource {res_name} tags: {res_tags}')
errors = validate_tags('tags', res_tags)
if errors:
USER_LOG.warn(
f'The resource {res_name} tags don\'t pass validation and '
f'will be removed from the resource meta. Details "{errors}"')
res_meta.pop('tags')
continue
overall_tags = _format_tags(res_meta['resource_type'],
{**common_tags, **res_tags})
_LOG.debug(f'Resolved resource {res_name} tags {overall_tags}')
res_meta['tags'] = overall_tags
def preprocess_tags(output: dict):
for item in output.values():
res_meta = item['resource_meta']
tags = res_meta.get('tags')
match tags:
case tags if isinstance(tags, dict):
continue
case tags if isinstance(tags, list):
res_meta['tags'] = _tags_to_dict(tags)
case _:
res_meta.pop('tags', None)
def _tags_to_dict(tags: list) -> dict:
result = {}
for tag in tags:
tag_key = None
tag_value = ''
for k, v in tag.items():
if k.lower() == 'key':
tag_key = v
if k.lower() == 'value':
tag_value = v
if tag_key is not None:
result.update({tag_key: tag_value})
return result
def _format_tags(res_type: str, tags: dict) -> dict | list:
match res_type:
case res_type if (res_type in
TAGS_RESOURCE_TYPE_CONFIG['capitalised_keys_list']):
return [{'Key': k, 'Value': v} for k, v in tags.items()]
case res_type if (res_type in
TAGS_RESOURCE_TYPE_CONFIG['lover_case_keys_list']):
return [{'key': k, 'value': v} for k, v in tags.items()]
case res_type if res_type in TAGS_RESOURCE_TYPE_CONFIG['untaggable']:
_LOG.debug(f'The resource type {res_type} can not be tagged')
return {}
case _:
return tags
def _resolve_aliases(overall_meta):
"""
:type overall_meta: dict
"""
from syndicate.core import CONFIG
if CONFIG.aliases:
aliases = {'${' + key + '}': str(value) for key, value in
CONFIG.aliases.items()}
overall_meta = resolve_dynamic_identifier(aliases, overall_meta)
return overall_meta
def _resolve_permissions_boundary(overall_meta):
"""Adds to every resource with resource_type IAM_ROLE permissions boundary
from the config"""
from syndicate.core import CONFIG
if CONFIG.iam_permissions_boundary:
for name, meta in overall_meta.items():
if meta.get('resource_type') == IAM_ROLE:
meta['permissions_boundary'] = CONFIG.iam_permissions_boundary
def resolve_resource_name(resource_name, prefix=None, suffix=None):
return _resolve_suffix_name(
_resolve_prefix_name(resource_name, prefix), suffix)
def resolve_resource_name_by_data(resource_name, resource_prefix,
resource_suffix):
return _resolve_suffix_name(
_resolve_prefix_name(resource_name, resource_prefix), resource_suffix)
def _resolve_prefix_name(resource_name, resource_prefix):
if resource_prefix:
return resolve_aliases_for_string(resource_prefix) + resource_name
return resource_name
def _resolve_suffix_name(resource_name, resource_suffix):
if resource_suffix:
return resource_name + resolve_aliases_for_string(resource_suffix)
return resource_name
def get_meta_from_output(output: dict):
from syndicate.core import CONFIG
meta = {}
for arn, data in output.items():
resource_meta = data.get('resource_meta')
resource_name = data.get('resource_name')
suffix_index = resource_name.rfind(CONFIG.resources_suffix)
if suffix_index != -1: # if found
resource_name = \
resource_name[:suffix_index] + \
resource_name[suffix_index + len(CONFIG.resources_suffix):]
prefix_index = resource_name.find(CONFIG.resources_prefix)
if prefix_index != -1:
resource_name = \
resource_name[:prefix_index] + \
resource_name[prefix_index + len(CONFIG.resources_prefix):]
meta.update({resource_name: resource_meta})
return meta