def assemble_appsync()

in syndicate/core/build/runtime/appsync.py [0:0]


def assemble_appsync(project_path, bundles_dir, **kwargs):
    from syndicate.core import CONFIG
    path_to_project = CONFIG.project_path
    src_path = build_path(path_to_project, project_path)
    if not os.path.exists(src_path):
        raise AssertionError(
            f'Appsync sources are not located by path {src_path}')
    if not os.listdir(src_path):
        raise AssertionError(f'Appsync sources path {src_path} is empty')

    _LOG.info(f'Appsync sources are located by path: {src_path}')

    for item in os.listdir(src_path):
        appsync_src_path = build_path(src_path, item)
        if os.path.isdir(appsync_src_path):
            _LOG.info(f'Going to process \'{item}\'')
            if not os.listdir(appsync_src_path):
                raise AssertionError(
                    f'Appsync path {appsync_src_path} is empty')

            conf_file_path = build_path(src_path, item,
                                        APPSYNC_CONFIG_FILE_NAME)
            if not os.path.isfile(conf_file_path):
                raise AssertionError(
                    f'\'{APPSYNC_CONFIG_FILE_NAME}\' file not found for '
                    f'Appsync \'{item}\'.')

            with open(conf_file_path) as file:
                appsync_conf = json.load(file)
            schema_filepath = appsync_conf.get('schema_path')
            if not os.path.isabs(schema_filepath):
                schema_path = build_path(appsync_src_path, schema_filepath)
                USER_LOG.info(f'Path to schema file resolved as '
                              f'\'{schema_path}\'')
            else:
                schema_path = schema_filepath
            if not os.path.isfile(schema_path):
                raise AssertionError(
                    f'Schema file not found for Appsync \'{item}\' in the '
                    f'path {schema_path}.')

            artifact_name = APPSYNC_ARTIFACT_NAME_TEMPLATE.format(name=item)
            zip_file_path = build_path(src_path, artifact_name)
            appsync_conf['deployment_package'] = artifact_name

            with open(conf_file_path, 'w') as file:
                json.dump(appsync_conf, file)

            resolvers = appsync_conf.get('resolvers', [])
            functions = appsync_conf.get('functions', [])
            artifact_items = resolvers + functions
            artifacts_path = []
            for artifact_item in artifact_items:
                if artifact_file_path := artifact_item.get(
                        'request_mapping_template_path'):
                    artifacts_path.append(artifact_file_path)
                if artifact_file_path := artifact_item.get(
                        'response_mapping_template_path'):
                    artifacts_path.append(artifact_file_path)
                if artifact_file_path := artifact_item.get('code_path'):
                    artifacts_path.append(artifact_file_path)

            with zipfile.ZipFile(zip_file_path, 'w') as zipf:
                zipf.write(schema_path, schema_filepath)

                # to archive only resolvers and functions in config
                for path in artifacts_path:
                    artifacts_path = build_path(appsync_src_path, path)
                    if os.path.exists(artifacts_path):
                        zipf.write(artifacts_path, path)
                        _LOG.debug(
                            f"File from the path '{artifacts_path}' "
                            f"successfully added to the AppSync deployment "
                            f"package")
                    else:
                        raise AssertionError(
                            f"File not found by the path '{artifacts_path}'")

            if os.path.getsize(zip_file_path) == 0:
                raise AssertionError(
                    f'Appsync archive {zip_file_path} is empty')

            shutil.move(zip_file_path, build_path(bundles_dir, artifact_name))
            _LOG.info(f'Appsync {item} was processed successfully')