def _check_duplicated_resources()

in syndicate/core/build/meta_processor.py [0:0]


def _check_duplicated_resources(initial_meta_dict, additional_item_name,
                                additional_item):
    """ Match two meta dicts (overall and separated) for duplicates.

    :type initial_meta_dict: dict
    :type additional_item_name: str
    :type additional_item: dict
    """
    if additional_item_name in initial_meta_dict:
        additional_type = additional_item['resource_type']
        initial_item = initial_meta_dict.get(additional_item_name)
        if not initial_item:
            return
        initial_type = initial_item['resource_type']
        if additional_type == initial_type and initial_type in \
                {API_GATEWAY_TYPE, WEB_SOCKET_API_GATEWAY_TYPE}:
            # check if APIs have same resources
            for each in list(initial_item['resources'].keys()):
                if each in list(additional_item['resources'].keys()):
                    raise AssertionError(
                        "API '{0}' has duplicated resource '{1}'! Please, "
                        "change name of one resource or remove one.".format(
                            additional_item_name, each))
            # check if APIs have duplicated cluster configurations
            for config in ['cluster_cache_configuration',
                           'cluster_throttling_configuration']:
                initial_config = initial_item.get(config)
                additional_config = additional_item.get(config)
                if initial_config and additional_config:
                    raise AssertionError(
                        "API '{0}' has duplicated {1}. Please, remove one "
                        "configuration.".format(additional_item_name, config)
                    )
                if initial_config:
                    additional_item[config] = initial_config
            # handle responses
            initial_responses = initial_item.get(
                'api_method_responses')
            additional_responses = additional_item.get(
                'api_method_responses')
            if initial_responses and additional_responses:
                raise AssertionError(
                    "API '{0}' has duplicated api method responses "
                    "configurations. Please, remove one "
                    "api method responses configuration.".format(
                        additional_item_name)
                )
            if initial_responses:
                additional_item[
                    'api_method_responses'] = initial_responses
            # handle integration responses
            initial_integration_resp = initial_item.get(
                'api_method_integration_responses')
            additional_integration_resp = additional_item.get(
                'api_method_integration_responses')
            if initial_integration_resp and additional_integration_resp:
                raise AssertionError(
                    "API '{0}' has duplicated api method integration "
                    "responses configurations. Please, remove one "
                    "api method integration responses configuration.".format(
                        additional_item_name)
                )
            if initial_integration_resp:
                additional_item[
                    'api_method_integration_responses'] = initial_integration_resp
            # join items dependencies
            dependencies_dict = {each['resource_name']: each
                                 for each in
                                 additional_item.get('dependencies') or []}
            for each in initial_item.get('dependencies') or []:
                if each['resource_name'] not in dependencies_dict:
                    additional_item['dependencies'].append(each)
            # join items resources
            additional_item['resources'].update(initial_item['resources'])
            # return aggregated API description
            init_deploy_stage = initial_item.get('deploy_stage')
            if init_deploy_stage:
                additional_item['deploy_stage'] = init_deploy_stage

            init_compression = initial_item.get("minimum_compression_size")
            if init_compression:
                additional_comp_size = \
                    additional_item.get('minimum_compression_size')
                if additional_comp_size:
                    _LOG.warn(f"Found 'minimum_compression_size': "
                              f"{init_compression} inside root "
                              f"deployment_resources. The value "
                              f"'{additional_comp_size}' from: "
                              f"{additional_item} will be overwritten")
                additional_item['minimum_compression_size'] = init_compression

            # join authorizers
            initial_authorizers = initial_item.get('authorizers') or {}
            additional_authorizers = additional_item.get('authorizers') or {}
            additional_item['authorizers'] = {**initial_authorizers,
                                              **additional_authorizers}
            # join models
            initial_models = initial_item.get('models') or {}
            additional_models = additional_item.get('models') or {}
            additional_item['models'] = {**initial_models, **additional_models}
            # policy statement singleton
            _pst = initial_item.get('policy_statement_singleton')
            if 'policy_statement_singleton' not in additional_item and _pst:
                additional_item['policy_statement_singleton'] = _pst

            additional_item['route_selection_expression'] = initial_item.get(
                'route_selection_expression')

            additional_item = _merge_api_gw_list_typed_configurations(
                initial_item,
                additional_item,
                ['binary_media_types', 'apply_changes']
            )

            return additional_item

        else:
            initial_item_type = initial_item.get("resource_type")
            additional_item_type = additional_item.get("resource_type")
            raise AssertionError(
                f"Two resources with equal names were found! "
                f"Name: '{additional_item_name}', first resource type: "
                f"'{initial_item_type}', second resource type: "
                f"'{additional_item_type}'. \nPlease, rename one of them!"
            )