def write()

in syndicate/core/generators/deployment_resources/appsync_generator.py [0:0]


    def write(self):
        if self._dict['kind'] == 'UNIT':
            data_sources = self.appsync_config.get('data_sources', [])
            current_ds_name = self._dict.get('data_source_name')
            error_message = (
                f"Data source '{current_ds_name}' not found in the SyncApp "
                f"'{self.appsync_name}' definition.")
            for data_source in data_sources:
                if current_ds_name == data_source['name']:
                    self._dict['data_source_type'] = data_source['type']
                    error_message = None
                    break
            if error_message:
                raise ValueError(error_message)
        elif self._dict['kind'] == 'PIPELINE':
            if (functions := self.appsync_config.get('functions', [])) is None:
                functions = []
            if (curr_functions := self._dict.get('function_name', [])) is None:
                curr_functions = []
            absent_funcs = []

            for func_name in set(curr_functions):
                func_exist = False
                for func_conf in functions:
                    if func_conf['name'].lower() == func_name.lower():
                        func_exist = True
                        break
                if not func_exist:
                    absent_funcs.append(func_name)
            if absent_funcs:
                raise ValueError(
                    f"The next function/s '{absent_funcs}' not found in the "
                    f"SyncApp '{self.appsync_name}' definition.")
            self._dict['pipeline_config'] = {
                'functions': curr_functions
            }
        resolvers = self.appsync_config.get('resolvers', [])
        current_type_name = self._dict.get('type_name')
        current_field_name = self._dict.get('field_name')
        for resolver in resolvers:
            if (resolver['type_name'].lower() == current_type_name.lower()
                    and
               resolver['field_name'].lower() == current_field_name.lower()):
                message = (f"The resolver for the type '{current_type_name}' "
                           f"and field '{current_field_name}' already exists.")
                if click_confirm(f"{message} Overwrite?"):
                    _LOG.warning(
                        f"Overwriting resolver for the type "
                        f"'{current_type_name}' and field "
                        f"'{current_field_name}'")
                    resolvers.remove(resolver)
                else:
                    _LOG.warning(
                        f"Skipping resolver for the type "
                        f"'{current_type_name}' and field "
                        f"'{current_field_name}'")
                    raise RuntimeError

        resolvers.append(self._resolve_configuration())
        self.appsync_config['resolvers'] = resolvers
        self._save_config()