in syndicate/core/build/meta_processor.py [0:0]
def _look_for_configs(nested_files: list[str], resources_meta: dict[str, Any],
path: str, bundle_name: str) -> None:
""" Look for all config files in project structure. Read content and add
all meta to overall meta if there is no duplicates. If duplicates found -
raise AssertionError.
:param nested_files: A list of files in the project
:param resources_meta: A dictionary of resources metadata
:param path: A string path to the project
:param bundle_name: A string name of the bundle
"""
for each in nested_files:
if each.endswith(LAMBDA_CONFIG_FILE_NAME) or \
each.endswith(LAMBDA_LAYER_CONFIG_FILE_NAME) or \
each.endswith(SWAGGER_UI_CONFIG_FILE_NAME) or \
each.endswith(APPSYNC_CONFIG_FILE_NAME):
resource_config_path = os.path.join(path, each)
_LOG.debug(f'Processing file: {resource_config_path}')
with open(resource_config_path) as data_file:
resource_conf = load(data_file)
resource_name = resource_conf['name']
resource_type = resource_conf['resource_type']
_LOG.debug(f'Found {resource_type}: {resource_name}')
res = _check_duplicated_resources(resources_meta, resource_name,
resource_conf)
if res:
resource_conf = res
resources_meta[resource_name] = resource_conf
if each.endswith(OAS_V3_FILE_NAME):
openapi_spec_path = os.path.join(path, each)
_LOG.debug(f'Processing file: {openapi_spec_path}')
with open(openapi_spec_path) as data_file:
openapi_spec = load(data_file)
api_gateway_name = openapi_spec['info']['title']
_LOG.debug(f'Found API Gateway: {api_gateway_name}')
deploy_stage = extract_deploy_stage_from_openapi_spec(openapi_spec)
resource = {
"definition": openapi_spec,
"resource_type": API_GATEWAY_OAS_V3_TYPE,
"deploy_stage": deploy_stage,
}
tags = openapi_spec.get("x-syndicate-openapi-tags")
if tags:
resource["tags"] = tags
res = _check_duplicated_resources(
resources_meta, api_gateway_name, resource
)
if res:
resource = res
resources_meta[api_gateway_name] = resource
if each == RESOURCES_FILE_NAME:
additional_config_path = os.path.join(path, RESOURCES_FILE_NAME)
_LOG.debug('Processing file: {0}'.format(additional_config_path))
with open(additional_config_path, encoding='utf-8') as json_file:
deployment_resources = load(json_file)
for resource_name in deployment_resources:
_LOG.debug('Found resource ' + resource_name)
resource = deployment_resources[resource_name]
# check if resource type exists in deployment framework and
# has resource_type field
try:
resource_type = resource['resource_type']
except KeyError:
error_message = \
f"There is no 'resource_type' in {resource_name}"
_LOG.error(error_message)
raise AssertionError(error_message)
if resource_type not in RESOURCE_LIST:
error_message = (
f'Unsupported resource type found: "{resource_type}". '
f'Please double-check the correctness of the specified '
f'resource type. To add a new resource type please '
f'request the support team.')
_LOG.error(error_message)
raise KeyError(error_message)
res = _check_duplicated_resources(resources_meta,
resource_name, resource)
if res:
resource = res
resources_meta[resource_name] = resource