in syndicate/core/resources/appsync_resource.py [0:0]
def _update_graphql_api(self, name, meta, context):
extract_to = None
api = self.appsync_conn.get_graphql_api_by_name(name)
if not api:
raise AssertionError(f'{name} GraphQL API does not exist.')
archive_path = meta.get('deployment_package')
if archive_path:
extract_to = self._extract_zip(archive_path, name)
else:
_LOG.warning('Cannot find appsync deployment package!')
api_id = api['apiId']
auth_type = meta.get('primary_auth_type')
extra_auth_types = meta.get('extra_auth_types', [])
lambda_auth_config = meta.get('lambda_authorizer_config', {})
user_pool_config = meta.get('user_pool_config', {})
if auth_type == AWS_LAMBDA_TYPE:
lambda_region = lambda_auth_config.pop(AWS_REGION_PARAMETER, None)
lambda_name = lambda_auth_config.pop('resource_name', None)
if not lambda_region or not lambda_name:
lambda_auth_config = {}
USER_LOG.error(
f"Authorization type '{auth_type}' can't be configured "
f"for AppSync '{name}' because lambda resource name or "
f"aws region is not specified")
else:
lambda_arn = self.build_lambda_arn(
region=lambda_region,
name=lambda_name)
lambda_auth_config['authorizer_uri'] = lambda_arn
if auth_type == AWS_CUP_TYPE:
cup_name = user_pool_config.pop('resource_name', None)
cup_id = self.cup_conn.if_pool_exists_by_name(cup_name)
if cup_id:
user_pool_config['user_pool_id'] = cup_id
else:
user_pool_config = {}
USER_LOG.error(
f"Authorization type '{auth_type}' can't be configured "
f"for AppSync '{name}' because Cognito User Pool "
f"{cup_name} not found")
updated_extra_auth_types, is_extra_auth_api_key = \
self._process_extra_auth(extra_auth_types, name)
self.appsync_conn.update_graphql_api(
api_id, name, auth_type=auth_type,
user_pool_config=user_pool_config,
open_id_config=meta.get('open_id_config'),
lambda_auth_config=lambda_auth_config,
log_config=self._resolve_log_config(
meta.get('log_config', {}), name
),
xray_enabled=meta.get('xray_enabled'),
extra_auth_types=updated_extra_auth_types)
if auth_type == 'API_KEY' or is_extra_auth_api_key:
if not self.get_active_api_keys(api_id):
api_key_expiration = meta.get('api_key_expiration_days', 7)
now = time.time()
api_key_expires = \
int(now + api_key_expiration * ONE_DAY_IN_SECONDS)
self.appsync_conn.create_api_key(
api_id, expires=api_key_expires)
if schema_path := meta.get('schema_path'):
schema_full_path = build_path(extract_to, schema_path)
if not extract_to or not os.path.exists(schema_full_path):
raise AssertionError(
f'\'{schema_full_path}\' file not found for '
f'AppSync \'{name}\'')
with open(schema_full_path, 'r', encoding='utf-8') as file:
schema_definition = file.read()
status, details = \
self.appsync_conn.create_schema(api_id, schema_definition)
if status != 'SUCCESS':
error_message = (
f"An error occurred when updating schema. "
f"Operation status: '{status}'. ")
if details:
error_message += f"Details: '{details}'"
raise AssertionError(error_message)
else:
_LOG.info(
f"Schema of the AppSync '{name}' updated successfully")
existent_sources = self.appsync_conn.list_data_sources(api_id)
data_sources_meta = meta.get('data_sources', [])
for source_meta in data_sources_meta:
to_create = True
for source in existent_sources:
if source['name'] == source_meta['name']:
# update an existent one
to_create = False
params = self._build_data_source_params_from_meta(
source_meta)
if not params:
break
_LOG.debug(f"Updating data source '{source['name']}'")
self.appsync_conn.update_data_source(api_id, **params)
existent_sources.remove(source)
break
# create a new one
if to_create:
params = self._build_data_source_params_from_meta(source_meta)
if not params:
continue
_LOG.debug(f"Creating data source '{source_meta['name']}'")
self.appsync_conn.create_data_source(api_id, **params)
for source in existent_sources:
self.appsync_conn.delete_data_source(api_id, source['name'])
actual_funcs = []
existent_funcs = self.appsync_conn.list_functions(api_id)
funcs_meta = meta.get('functions', [])
for func_meta in funcs_meta:
to_create = True
for func_conf in existent_funcs:
if func_meta['name'] == func_conf['name']:
# update an existent one
to_create = False
params = self._build_function_params_from_meta(
func_meta, extract_to
)
if not params:
break
_LOG.debug(f"Updating function '{func_conf['name']}'")
actual_funcs.append(
self.appsync_conn.update_function(
api_id, func_conf['functionId'], params))
existent_funcs.remove(func_conf)
break
# create a new one
if to_create:
params = self._build_function_params_from_meta(
func_meta, extract_to
)
if not params:
break
_LOG.debug(f"Creating function '{func_meta['name']}'")
actual_funcs.append(
self.appsync_conn.create_function(api_id, params))
types = self.appsync_conn.list_types(api_id)
existent_resolvers = []
for t in types:
existent_resolvers.extend(
self.appsync_conn.list_resolvers(api_id, t['name']))
resolvers_meta = meta.get('resolvers', [])
for resolver_meta in resolvers_meta:
to_create = True
for resolver in existent_resolvers:
if resolver['typeName'] == resolver_meta['type_name'] and \
resolver['fieldName'] == resolver_meta['field_name']:
# update an existent one
to_create = False
params = self._build_resolver_params_from_meta(
resolver_meta, extract_to, actual_funcs)
if not params:
break
_LOG.debug(
f"Updating resolver for type '{resolver['typeName']}' "
f"and field '{resolver['fieldName']}'")
self.appsync_conn.update_resolver(api_id, **params)
existent_resolvers.remove(resolver)
break
# create a new one
if to_create:
params = self._build_resolver_params_from_meta(
resolver_meta, extract_to, actual_funcs)
if not params:
continue
_LOG.debug(
f"Creating resolver for type '{resolver_meta['typeName']}' "
f"and field '{resolver_meta['fieldName']}'")
self.appsync_conn.create_resolver(api_id, **params)
for resolver in existent_resolvers:
self.appsync_conn.delete_resolver(api_id,
type_name=resolver['typeName'],
field_name=resolver['fieldName'])
for func_conf in existent_funcs:
self.appsync_conn.delete_function(api_id, func_conf['functionId'])
_LOG.info(f'Updated AppSync GraphQL API {api_id}')
return self.describe_graphql_api(name=name, meta=meta, api_id=api_id)