syndicate/core/build/deployment_processor.py (758 lines of code) (raw):

""" Copyright 2018 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import concurrent import copy import functools from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor from functools import cmp_to_key from typing import Any from syndicate.commons.log_helper import get_logger, get_user_logger from syndicate.core.build.bundle_processor import create_deploy_output, \ load_deploy_output, load_failed_deploy_output, load_meta_resources, \ remove_failed_deploy_output, load_latest_deploy_output, \ remove_deploy_output from syndicate.core.build.meta_processor import resolve_meta, \ populate_s3_paths, resolve_resource_name, get_meta_from_output, \ resolve_tags, preprocess_tags from syndicate.core.constants import (BUILD_META_FILE_NAME, CLEAN_RESOURCE_TYPE_PRIORITY, DEPLOY_RESOURCE_TYPE_PRIORITY, UPDATE_RESOURCE_TYPE_PRIORITY, PARTIAL_CLEAN_ACTION, ABORTED_STATUS, LAMBDA_TYPE, LAMBDA_LAYER_TYPE) from syndicate.core.helper import prettify_json from syndicate.core.build.helper import assert_bundle_bucket_exists, \ construct_deploy_s3_key_path BUILD_META = 'build_meta' DEPLOYMENT_OUTPUT = 'deployment_output' _LOG = get_logger(__name__) USER_LOG = get_user_logger() def _process_resources( resources: list, handlers_mapping: dict, describe_handlers: dict | None = None, pass_context: bool = False, output = None, ) -> tuple[bool, Any]: output = output or {} errors = [] args = [] resource_type = None is_succeeded = True try: for res_name, res_meta in resources: current_res_type = res_meta['resource_type'] if resource_type is None: resource_type = current_res_type if current_res_type == resource_type: args.append(_build_args(name=res_name, meta=res_meta, context=output, pass_context=pass_context)) continue elif current_res_type != resource_type: USER_LOG.info(f'Processing {resource_type} resources') func = handlers_mapping[resource_type] response = func(args) response_errors = process_response(response=response, output=output) errors.extend(response_errors) del args[:] args.append(_build_args(name=res_name, meta=res_meta, context=output, pass_context=pass_context)) resource_type = current_res_type if args: USER_LOG.info(f'Processing {resource_type} resources') func = handlers_mapping[resource_type] response = func(args) response_errors = process_response(response=response, output=output) errors.extend(response_errors) if errors: is_succeeded = False except Exception as e: USER_LOG.exception( f'Error occurred while {resource_type} resource creating: {e}' ) is_succeeded = False if not is_succeeded: for item in args: func = describe_handlers[item['meta']['resource_type']] try: response = func(item['name'], item['meta']) except Exception as e: response = ({}, [str(e)]) if response: response_errors = process_response(response=response, output=output) errors.extend(response_errors) return is_succeeded, output def _process_resources_with_dependencies(resources, handlers_mapping, describe_handlers, pass_context=False, overall_resources=None, output=None, current_resource_type=None, run_count=0): overall_resources = overall_resources or resources output = output or {} resource_type = None is_succeeded = True errors = [] try: for res_name, res_meta in resources: args = [] resource_type = res_meta['resource_type'] if res_meta.get('processed'): _LOG.debug(f"Processing of '{resource_type}' '{res_name}' " f"skipped. Resource already processed") continue if run_count >= len(overall_resources): raise RecursionError( "An infinite loop detected in resource dependencies") run_count += 1 dependencies = [item['resource_name'] for item in res_meta.get('dependencies', [])] _LOG.debug(f"'{resource_type}' '{res_name}' depends on resources: " f"{dependencies}") # Order of items in depends_on_resources is important! depends_on_resources = [] for dep_res_name in dependencies: for overall_res_name, overall_res_meta in overall_resources: if overall_res_name == dep_res_name: depends_on_resources.append((overall_res_name, overall_res_meta)) if depends_on_resources: _LOG.info( f"Processing '{resource_type}' '{res_name}' dependencies " f"{prettify_json(res_meta['dependencies'])}") success, output = _process_resources_with_dependencies( resources=depends_on_resources, handlers_mapping=handlers_mapping, describe_handlers=describe_handlers, pass_context=pass_context, overall_resources=overall_resources, output=output, current_resource_type=current_resource_type, run_count=run_count) if not success: return False, output args.append(_build_args(name=res_name, meta=res_meta, context=output, pass_context=pass_context)) if current_resource_type != resource_type: USER_LOG.info(f'Processing {resource_type} resources') current_resource_type = resource_type func = handlers_mapping[resource_type] response = func(args) response_errors = process_response(response=response, output=output) errors.extend(response_errors) res_meta['processed'] = True overall_res_index = overall_resources.index( (res_name, res_meta)) overall_resources[overall_res_index][-1]['processed'] = True if errors: is_succeeded = False except Exception as e: if 'An infinite loop' in str(e): USER_LOG.error(e.args[0]) else: USER_LOG.exception(f"Error occurred while '{resource_type}' " f"resource creating: {str(e)}") is_succeeded = False if not is_succeeded: for item in args: func = describe_handlers[item['meta']['resource_type']] try: response = func(item['name'], item['meta']) except Exception as e: response = ({}, [str(e)]) if response: response_errors = process_response(response=response, output=output) errors.extend(response_errors) return is_succeeded, output def _build_args(name, meta, context, pass_context=False): """ Builds parameters to pass to resource_type handler. Default parameters dict consists of name and meta keys. If pass_context set to True, parameters dict is extended with 'context' key :param name: name of the resource :param meta: definition of the resource :param context: result of previously deployed resources in scope of current syndicate execution :param pass_context: flag. Manages if output will be included to parameters :return: prepared parameters to be passed to resource_type handler. """ params = {'name': name, 'meta': meta} if pass_context: params['context'] = context return params def update_failed_output(res_name, res_meta, resource_type, output): from syndicate.core import PROCESSOR_FACADE try: describe_func = PROCESSOR_FACADE.describe_handlers()[resource_type] failed_resource_output = describe_func(res_name, res_meta) if failed_resource_output: if isinstance(failed_resource_output, list): for item in failed_resource_output: output.update(item) else: output.update(failed_resource_output) except Exception as e: _LOG.warning(f'Unable to describe {resource_type} ' f'resource with name {res_name}. Exception: {e}') return output def deploy_resources( resources: list, output=None, ) -> tuple[bool, Any]: from syndicate.core import PROCESSOR_FACADE process_with_dependency = False for _, res_meta in resources: res_priority = DEPLOY_RESOURCE_TYPE_PRIORITY[res_meta['resource_type']] dependencies = res_meta.get('dependencies', []) dep_priorities = [ DEPLOY_RESOURCE_TYPE_PRIORITY[item['resource_type']] for item in dependencies] if dep_priorities: if max(dep_priorities) >= res_priority: process_with_dependency = True break if process_with_dependency: USER_LOG.warning( 'Resource dependency with higher deployment priority from a ' 'resource with equal or lower deployment priority detected. ' 'Deployment may take a little bit more time than usual.') return _process_resources_with_dependencies( resources=resources, handlers_mapping=PROCESSOR_FACADE.create_handlers(), describe_handlers=PROCESSOR_FACADE.describe_handlers(), output=output) return _process_resources( resources=resources, handlers_mapping=PROCESSOR_FACADE.create_handlers(), describe_handlers=PROCESSOR_FACADE.describe_handlers(), output=output) def update_resources( resources: list[tuple[str, dict]], old_resources: set, )-> tuple[bool, Any]: from syndicate.core import PROCESSOR_FACADE # exclude new resources that were added after deployment to_remove = \ [i for i, res in enumerate(resources) if res[0] not in old_resources] for i in reversed(to_remove): _LOG.warning( f'Skipping resource {resources[i][0]} due to absence in initial ' f'deployment output.' ) resources.pop(i) return _process_resources( resources=resources, handlers_mapping=PROCESSOR_FACADE.update_handlers(), describe_handlers=PROCESSOR_FACADE.describe_handlers(), pass_context=True) def clean_resources(output): from syndicate.core import PROCESSOR_FACADE clean_output = {} errors = [] args = [] resource_type = None # clean all resources for arn, config in output: res_type = config['resource_meta']['resource_type'] if resource_type is None: resource_type = res_type if res_type == resource_type: args.append({'arn': arn, 'config': config}) continue elif res_type != resource_type: USER_LOG.info('Removing {0} resources ...'.format(resource_type)) func = PROCESSOR_FACADE.remove_handlers()[resource_type] result = func(args) response_errors = process_response(result, clean_output) errors.extend(response_errors) del args[:] args.append({'arn': arn, 'config': config}) resource_type = res_type if args: USER_LOG.info('Removing {0} resources ...'.format(resource_type)) func = PROCESSOR_FACADE.remove_handlers()[resource_type] result = func(args) response_errors = process_response(result, clean_output) errors.extend(response_errors) removed_resources_arn = list(clean_output.keys()) success = False if errors else True return success, removed_resources_arn def continue_deploy_resources(resources, latest_deploy_output): for arn, meta in latest_deploy_output.items(): for resource_name, resource_meta in resources: if resource_name == meta['resource_name']: resources.remove((resource_name, resource_meta)) if not resources: USER_LOG.info('Skipping deployment because all specified resources ' 'already deployed') return True, latest_deploy_output, [] return deploy_resources(resources) def process_response( response: tuple, output: dict, ) -> list: errors = [] if isinstance(response, dict): output.update(response) elif isinstance(response, tuple): result, exceptions = response if isinstance(result, dict): output.update(result) else: _LOG.warning( f"Got unexpected response. Expect dict. Got '{type(response)}'," f" '{str(response)}'" ) if isinstance(exceptions, list): errors.extend(exceptions) USER_LOG.error('\n'.join(exceptions)) else: USER_LOG.error(str(exceptions)) errors.append(str(exceptions)) return errors def __move_output_content(args, failed_output, updated_output): for arg in args: resource_output = __find_output_by_resource_name( failed_output, arg['name']) if resource_output: updated_output.update(resource_output) def __find_output_by_resource_name(output, resource_name): found_items = {} for k, v in output.items(): if v['resource_name'] == resource_name: found_items[k] = v return found_items def _compare_external_resources(expected_resources): from syndicate.core import PROCESSOR_FACADE compare_funcs = PROCESSOR_FACADE.compare_meta_handlers() errors = {} for resource_name, resource_meta in expected_resources.items(): func = compare_funcs[resource_meta.get('resource_type')] resource_errors = func(resource_name, resource_meta) if resource_errors: errors[resource_name] = resource_errors if errors: import os error = f'{os.linesep}'.join(errors.values()) raise AssertionError(error) def create_deployment_resources( *, deploy_name: str, bundle_name: str, deploy_only_types: tuple | None = None, deploy_only_resources: tuple | None = None, excluded_resources: tuple | None = None, excluded_types: tuple | None = None, continue_deploy: bool = False, replace_output: bool = False, rollback_on_error: bool = False, ) -> bool: is_ld_output_regular, latest_deploy_output = \ load_latest_deploy_output(failsafe=True) if latest_deploy_output is False: USER_LOG.warning(f'The latest deploy output is absent. The current ' f'deployment will be performed without taking into ' f'account the latest deployment.') elif is_ld_output_regular is True: _LOG.info(f'The latest deployment has status succeeded. ' f'Loaded output:\n {prettify_json(latest_deploy_output)}') elif is_ld_output_regular is False: _LOG.info(f'The latest deployment has status failed. ' f'Loaded output:\n {prettify_json(latest_deploy_output)}') resources = load_meta_resources(bundle_name) # validate_deployment_packages(resources) _LOG.debug(f'{BUILD_META_FILE_NAME} file was loaded successfully') resources = resolve_meta(resources) _LOG.debug('Names were resolved') resources = populate_s3_paths(resources, bundle_name) _LOG.debug('Artifacts s3 paths were resolved') resolve_tags(resources) deploy_only_resources = _resolve_names(deploy_only_resources) excluded_resources = _resolve_names(excluded_resources) _LOG.info('Prefixes and suffixes of any resource names have been resolved') expected_external_resources = { key: value for key, value in resources.items() if value.get('external') } if expected_external_resources: _compare_external_resources(expected_external_resources) _LOG.info('External resources were matched successfully') resources = _filter_resources( resources_meta=resources, resource_names=deploy_only_resources, resource_types=deploy_only_types, exclude_names=excluded_resources, exclude_types=excluded_types ) _LOG.debug(f'Going to create: {resources}') # sort resources with priority resources_list = list(resources.items()) resources_list.sort(key=cmp_to_key(compare_deploy_resources)) _LOG.info('Going to deploy AWS resources') if continue_deploy: if latest_deploy_output is False: USER_LOG.warning( f'The latest deploy output is absent. The command will be ' f'executed without taking into account the ' f'`--continue_deploy` parameter.') success, output = continue_deploy_resources( resources_list, latest_deploy_output if latest_deploy_output else {}) else: success, output = deploy_resources(resources_list) # remove failed output from bucket if is_ld_output_regular is False: remove_failed_deploy_output(bundle_name, deploy_name) if not success: tag_success = True if rollback_on_error is True: USER_LOG.info( "Deployment failed, `rollback_on_error` is enabled," " going to clean resources that have been deployed during" " deployment process.") output_resources_list = list(output.items()) output_resources_list.sort( key=cmp_to_key(_compare_clean_resources)) if latest_deploy_output: deploy_output_names = list(latest_deploy_output.keys()) rollback_resources_list =\ [resource for resource in output_resources_list if resource[0] not in deploy_output_names] clean_resources(rollback_resources_list) else: clean_resources(output_resources_list) else: _LOG.info('Going to apply post deployment tags') tag_success = _apply_post_deployment_tags(output) USER_LOG.info('Going to create deploy output') output = {**latest_deploy_output, **output} \ if latest_deploy_output else output create_deploy_output(bundle_name=bundle_name, deploy_name=deploy_name, output=output, success=success, replace_output=replace_output) else: USER_LOG.info('AWS resources were deployed successfully') # apply dynamic changes that uses ARNs _LOG.info('Going to apply dynamic changes') _apply_dynamic_changes(resources, output) USER_LOG.info('Dynamic changes were applied successfully') _LOG.info('Going to apply post deployment tags') tag_success = _apply_post_deployment_tags(output) USER_LOG.info('Going to create deploy output') output = {**latest_deploy_output, **output} \ if latest_deploy_output else output create_deploy_output(bundle_name=bundle_name, deploy_name=deploy_name, output=output, success=success, replace_output=replace_output) if not (success is False and rollback_on_error is True): USER_LOG.info(f'Deploy output for {deploy_name} was created.') return success and tag_success def update_deployment_resources( *, bundle_name: str, deploy_name: str, update_only_types: tuple | None = None, update_only_resources: tuple | None = None, excluded_resources: tuple | None = None, excluded_types: tuple | None = None, replace_output: bool = False, force: bool = False, ) -> bool: from syndicate.core import PROCESSOR_FACADE, PROJECT_STATE from click import confirm as click_confirm latest_bundle = PROJECT_STATE.get_latest_deployed_or_updated_bundle( bundle_name, latest_if_not_found=True) if not latest_bundle: latest_bundle = PROJECT_STATE.latest_deployed_bundle_name _LOG.debug(f'Latest bundle name: {latest_bundle}') try: old_output = load_deploy_output(latest_bundle, deploy_name) _LOG.info('Output file was loaded successfully') except AssertionError: try: old_output = load_failed_deploy_output(latest_bundle, deploy_name) if not force: if not click_confirm( "The latest deployment has status failed. " "Do you want to proceed with updating?"): return ABORTED_STATUS _LOG.warning( 'Updating resources despite previous deployment failures') except AssertionError: USER_LOG.error('Deployment to update not found.') return ABORTED_STATUS old_resources = get_meta_from_output(old_output) old_resources = _resolve_names(tuple(old_resources.keys())) resources = load_meta_resources(bundle_name) _LOG.debug(prettify_json(resources)) resources = resolve_meta(resources) _LOG.debug('Names were resolved') resources = populate_s3_paths(resources, bundle_name) _LOG.debug('Artifacts s3 paths were resolved') resolve_tags(resources) USER_LOG.warning( 'Please pay attention that only the ' 'following resources types are supported for update: {}'.format( list(PROCESSOR_FACADE.update_handlers().keys()))) update_only_resources = _resolve_names(update_only_resources) _LOG.info( 'Prefixes and suffixes of any resource names have been resolved.') resources = dict( (k, v) for (k, v) in resources.items() if v['resource_type'] in PROCESSOR_FACADE.update_handlers().keys() ) resources = _filter_resources( resources_meta=resources, resource_names=update_only_resources, resource_types=update_only_types, exclude_names=excluded_resources, exclude_types=excluded_types ) _LOG.debug( f'Going to update the following resources: {prettify_json(resources)}') resources_list = list(resources.items()) resources_list.sort(key=cmp_to_key(_compare_update_resources)) success, output = update_resources(resources_list, old_resources) _LOG.info('Going to updates tags') preprocess_tags(output) tag_success = _update_tags(old_output, output) _LOG.info('Going to resolve updated resources duplication') duplicates = _detect_duplicates(old_output, output) for duplicate in duplicates: old_output.pop(duplicate) create_deploy_output(bundle_name=bundle_name, deploy_name=deploy_name, output={**old_output, **output}, success=success, replace_output=replace_output) if success and tag_success: remove_failed_deploy_output(bundle_name, deploy_name) return success and tag_success def remove_deployment_resources( deploy_name: str, bundle_name: str, clean_only_resources: tuple | None = None, clean_only_types: tuple | None = None, excluded_resources: tuple | None = None, excluded_types: tuple | None = None, clean_externals: bool = False, preserve_state: bool = False, ): is_regular_output = True try: output = load_deploy_output(bundle_name, deploy_name) _LOG.info('Output file was loaded successfully') except AssertionError: try: output = load_failed_deploy_output(bundle_name, deploy_name) is_regular_output = False except AssertionError: USER_LOG.error("Deployment to clean not found.") return ABORTED_STATUS new_output = copy.deepcopy(output) clean_only_resources = _resolve_names(clean_only_resources) excluded_resources = _resolve_names(excluded_resources) _LOG.info('Prefixes and suffixes of any resource names have been resolved') if clean_externals: new_output = { k: v for k, v in new_output.items() if v['resource_meta'].get('external') } new_output = _filter_resources( resources_meta=new_output, resources_meta_type=DEPLOYMENT_OUTPUT, resource_names=clean_only_resources, resource_types=clean_only_types, exclude_names=excluded_resources, exclude_types=excluded_types ) # sort resources with priority resources_list = list(new_output.items()) resources_list.sort(key=cmp_to_key(_compare_clean_resources)) _LOG.debug(f'Resources to delete: {prettify_json(resources_list)}') if resources_list: USER_LOG.info('Going to clean AWS resources') else: _LOG.info('Clean skipped because resources to clean absent') success, removed_resources_arn = clean_resources(resources_list) _LOG.debug(f'Removed successfully: \'{removed_resources_arn}\'') new_output = {k: v for k, v in new_output.items() if k in removed_resources_arn} # remove new_output from bucket return _post_remove_output_handling( deploy_name=deploy_name, bundle_name=bundle_name, output=output, new_output=new_output, is_regular_output=is_regular_output, success=success, preserve_state=preserve_state, ) def _post_remove_output_handling( deploy_name: str, bundle_name: str, output: dict, new_output: dict, is_regular_output: bool, success: bool, preserve_state: bool = False, ) -> bool | dict: if output == new_output: if not preserve_state: # remove output from bucket remove_failed_deploy_output(bundle_name, deploy_name) remove_deploy_output(bundle_name, deploy_name) else: for key, value in new_output.items(): output.pop(key, None) create_deploy_output(bundle_name=bundle_name, deploy_name=deploy_name, output=output, success=is_regular_output, replace_output=True) if not success: return success return {'operation': PARTIAL_CLEAN_ACTION} return success def _apply_dynamic_changes(resources, output): from syndicate.core import PROCESSOR_FACADE pool = ThreadPoolExecutor(max_workers=5) futures = [] for name, meta in resources.items(): resource_type = meta['resource_type'] apply_changes = meta.get('apply_changes') if apply_changes: for apply_item in apply_changes: change_type = apply_item['apply_type'] dependency_name = apply_item['dependency_name'] res_config = resources.get(dependency_name) if not res_config: _LOG.debug('Dependency resource {0} is not found, ' 'skipping the apply'.format(dependency_name)) else: dependency_type = res_config['resource_type'] func = PROCESSOR_FACADE.resource_identifier() \ .get(resource_type) if func: resource_output = __find_output_by_resource_name( output, name) identifier = func(name, resource_output) apply_func = PROCESSOR_FACADE.mapping_applier() \ .get(change_type) if apply_func: alias = '#{' + name + '}' f = pool.submit(apply_func, alias, identifier, apply_item) futures.append(f) else: _LOG.warn('Dynamic apply is not defined ' 'for {0} type'.format(change_type)) else: _LOG.warn('Resource identifier is not defined ' 'for {0} type'.format(dependency_type)) _LOG.info('Dynamic changes were applied to {0}'.format(name)) concurrent.futures.wait(futures, timeout=None, return_when=ALL_COMPLETED) def _apply_post_deployment_tags( output: dict, ) -> bool: from syndicate.core import RESOURCES_PROVIDER tags_resource: RESOURCES_PROVIDER = RESOURCES_PROVIDER.tags_api() success: bool = tags_resource.safe_apply_tags(output) return success def _update_tags( old_output: dict, new_output: dict, ) -> bool: from syndicate.core import RESOURCES_PROVIDER tags_resource: RESOURCES_PROVIDER = RESOURCES_PROVIDER.tags_api() success: bool = tags_resource.safe_update_tags(old_output, new_output) return success def compare_deploy_resources(first, second): first_resource_type = first[-1]['resource_type'] second_resource_type = second[-1]['resource_type'] first_res_priority = DEPLOY_RESOURCE_TYPE_PRIORITY[first_resource_type] second_res_priority = DEPLOY_RESOURCE_TYPE_PRIORITY[second_resource_type] return _compare_res(first_res_priority, second_res_priority) def _compare_clean_resources(first, second): first_resource_type = first[-1]['resource_meta']['resource_type'] second_resource_type = second[-1]['resource_meta']['resource_type'] first_res_priority = CLEAN_RESOURCE_TYPE_PRIORITY[first_resource_type] second_res_priority = CLEAN_RESOURCE_TYPE_PRIORITY[second_resource_type] return _compare_res(first_res_priority, second_res_priority) def _compare_update_resources(first, second): first_resource_type = first[-1]['resource_type'] second_resource_type = second[-1]['resource_type'] first_res_priority = UPDATE_RESOURCE_TYPE_PRIORITY[first_resource_type] second_res_priority = UPDATE_RESOURCE_TYPE_PRIORITY[second_resource_type] return _compare_res(first_res_priority, second_res_priority) def _compare_res(first_res_priority, second_res_priority): if first_res_priority < second_res_priority: return -1 elif first_res_priority > second_res_priority: return 1 else: return 0 def _resolve_names(names): from syndicate.core import CONFIG preset_name_resolution = functools.partial(resolve_resource_name, prefix=CONFIG.resources_prefix, suffix=CONFIG.resources_suffix) resolve_n_unify_names = lambda collection: set( collection + tuple(map(preset_name_resolution, collection))) return resolve_n_unify_names(names or tuple()) def _filter_resources( resources_meta: dict, resources_meta_type: str = BUILD_META, resource_names: set | None = None, resource_types: tuple | None = None, exclude_names: set | None = None, exclude_types: tuple | None = None, ) -> dict: resource_names = set() if resource_names is None else set(resource_names) resource_types = set() if resource_types is None else set(resource_types) exclude_names = set() if exclude_names is None else set(exclude_names) exclude_types = set() if exclude_types is None else set(exclude_types) _LOG.debug(f"Include resources by name: {list(resource_names) or 'All'}") _LOG.debug(f"Include resources by type: {list(resource_types) or 'All'}") _LOG.debug(f"Exclude resources by name: {list(exclude_names) or 'None'}") _LOG.debug(f"Exclude resources by type: {list(exclude_types) or 'None'}") if not any([resource_names, resource_types]): filtered = resources_meta else: if resources_meta_type == BUILD_META: filtered = { k: v for k, v in resources_meta.items() if k in resource_names or v['resource_type'] in resource_types } elif resources_meta_type == DEPLOYMENT_OUTPUT: filtered = { k: v for k, v in resources_meta.items() if v['resource_name'] in resource_names or v['resource_meta']['resource_type'] in resource_types } for k, v, in copy.deepcopy(filtered).items(): if resources_meta_type == BUILD_META: if k in exclude_names or v['resource_type'] in exclude_types: filtered.pop(k) elif resources_meta_type == DEPLOYMENT_OUTPUT: if (v['resource_name'] in exclude_names or v['resource_meta']['resource_type'] in exclude_types): filtered.pop(k) return filtered def is_deploy_exist(bundle_name, deploy_name): from syndicate.core import CONN, CONFIG assert_bundle_bucket_exists() key_compound = construct_deploy_s3_key_path(bundle_name, deploy_name) failed_key_compound = construct_deploy_s3_key_path( bundle_name, deploy_name, is_failed=True) bucket = CONFIG.deploy_target_bucket return CONN.s3().get_keys_by_prefix(bucket, key_compound) or \ CONN.s3().get_keys_by_prefix(bucket, failed_key_compound) def _detect_duplicates(output: dict, new_output: dict) -> list: duplicates = [] for arn, meta in new_output.items(): res_type = meta['resource_meta']['resource_type'] res_name = meta['resource_name'] if res_type in [LAMBDA_TYPE, LAMBDA_LAYER_TYPE]: base_arn = arn[:arn.index(res_name) + len(res_name)] duplicates.extend( [ k for k in output.keys() if k != arn and k.startswith(base_arn) ] ) if duplicates: _LOG.info( f'The next duplicated ARNs will be removed from the output file ' f'{duplicates}') return duplicates