syndicate/core/resources/appsync_resource.py (654 lines of code) (raw):
import io
import os.path
import posixpath
import shutil
import time
from pathlib import PurePath
from zipfile import ZipFile
from botocore.exceptions import ClientError
from syndicate.commons.log_helper import get_logger, get_user_logger
from syndicate.core.constants import ARTIFACTS_FOLDER
from syndicate.core.helper import build_path, unpack_kwargs, \
dict_keys_to_camel_case
from syndicate.core.resources.base_resource import BaseResource
from syndicate.core.resources.helper import validate_params, \
build_description_obj
_LOG = get_logger(__name__)
USER_LOG = get_user_logger()
API_REQUIRED_PARAMS = ['schema_path']
DATA_SOURCE_REQUIRED_PARAMS = ['name', 'type']
RESOLVER_REQUIRED_PARAMS = ['type_name', 'field_name', 'runtime']
RESOLVER_DEFAULT_KIND = 'UNIT'
DEFAULT_FUNC_VTL_VERSION = '2018-05-29'
FUNCTION_REQUIRED_PARAMS = ['runtime', 'data_source_name']
AWS_REGION_PARAMETER = 'aws_region'
AWS_LAMBDA_TYPE = 'AWS_LAMBDA'
AWS_CUP_TYPE = 'AMAZON_COGNITO_USER_POOLS'
DATA_SOURCE_TYPE_CONFIG_MAPPING = {
AWS_LAMBDA_TYPE: 'lambda_config',
'AMAZON_DYNAMODB': 'dynamodb_config',
'AMAZON_ELASTICSEARCH': 'elasticsearch_config',
'HTTP': 'http_config',
'RELATIONAL_DATABASE': 'relational_database_config',
'AMAZON_OPENSEARCH_SERVICE': 'open_search_service_config',
'AMAZON_EVENTBRIDGE': 'event_bridge_config'
}
ONE_DAY_IN_SECONDS = 86400
class AppSyncResource(BaseResource):
def __init__(self, appsync_conn, s3_conn, cup_conn, cw_logs_conn,
deploy_target_bucket, deploy_target_bucket_key_compound,
account_id) -> None:
from syndicate.core import CONF_PATH
self.appsync_conn = appsync_conn
self.s3_conn = s3_conn
self.cup_conn = cup_conn
self.cw_logs_conn = cw_logs_conn
self.conf_path = CONF_PATH
self.deploy_target_bucket_key_compound = \
deploy_target_bucket_key_compound
self.deploy_target_bucket = deploy_target_bucket
self.account_id = account_id
def create_graphql_api(self, args):
""" Create GraphQL API in pool in sub processes.
:type args: list
"""
return self.create_pool(self._create_graphql_api_from_meta, args, 1)
@unpack_kwargs
def _create_graphql_api_from_meta(self, name, meta):
""" Create GraphQL API from meta description.
:type name: str
:type meta: dict
"""
extract_to = None
validate_params(name, meta, API_REQUIRED_PARAMS)
api = self.appsync_conn.get_graphql_api_by_name(name)
if api:
_LOG.warning(f'AppSync API {name} already exists.')
return self.describe_graphql_api(
name=name, meta=meta, api_id=api['apiId'])
archive_path = meta.get('deployment_package')
if archive_path:
extract_to = self._extract_zip(archive_path, name)
auth_type = meta.get('primary_auth_type')
extra_auth_types = meta.get('extra_auth_types', [])
lambda_auth_config = meta.get('lambda_authorizer_config', {})
user_pool_config = meta.get('user_pool_config', {})
if auth_type == AWS_LAMBDA_TYPE:
lambda_region = lambda_auth_config.pop(AWS_REGION_PARAMETER, None)
lambda_name = lambda_auth_config.pop('resource_name', None)
if not lambda_region or not lambda_name:
lambda_auth_config = {}
USER_LOG.error(
f"Authorization type '{auth_type}' can't be configured "
f"for AppSync '{name}' because lambda resource name or "
f"aws region is not specified")
else:
lambda_arn = self.build_lambda_arn(
region=lambda_region,
name=lambda_name)
lambda_auth_config['authorizer_uri'] = lambda_arn
if auth_type == AWS_CUP_TYPE:
cup_name = user_pool_config.pop('resource_name', None)
cup_id = self.cup_conn.if_pool_exists_by_name(cup_name)
if cup_id:
user_pool_config['user_pool_id'] = cup_id
else:
user_pool_config = {}
USER_LOG.error(
f"Authorization type '{auth_type}' can't be configured "
f"for AppSync '{name}' because Cognito User Pool "
f"{cup_name} not found")
updated_extra_auth_types, is_extra_auth_api_key = \
self._process_extra_auth(extra_auth_types, name)
api_id = self.appsync_conn.create_graphql_api(
name, auth_type=auth_type, tags=meta.get('tags'),
user_pool_config=user_pool_config,
open_id_config=meta.get('open_id_config'),
lambda_auth_config=lambda_auth_config,
log_config=self._resolve_log_config(
meta.get('log_config', {}), name
),
xray_enabled=meta.get('xray_enabled'),
extra_auth_types=updated_extra_auth_types)
if auth_type == 'API_KEY' or is_extra_auth_api_key:
api_key_expiration = meta.get('api_key_expiration_days', 7)
now = time.time()
api_key_expires = \
int(now + api_key_expiration * ONE_DAY_IN_SECONDS)
self.appsync_conn.create_api_key(api_id, expires=api_key_expires)
if schema_path := meta.get('schema_path'):
schema_full_path = build_path(extract_to, schema_path)
if not extract_to or not os.path.exists(schema_full_path):
raise AssertionError(
f'\'{schema_full_path}\' file not found for '
f'AppSync \'{name}\'')
with open(schema_full_path, 'r', encoding='utf-8') as file:
schema_definition = file.read()
status, details = \
self.appsync_conn.create_schema(api_id, schema_definition)
if status != 'SUCCESS':
error_message = (
f"An error occurred when creating schema. "
f"Operation status: '{status}'. ")
if details:
error_message += f"Details: '{details}'"
raise AssertionError(error_message)
else:
_LOG.info(
f"Schema of the AppSync '{name}' created successfully")
data_sources_meta = meta.get('data_sources', [])
for data_source_meta in data_sources_meta:
params = self._build_data_source_params_from_meta(data_source_meta)
if not params:
continue
self.appsync_conn.create_data_source(api_id, **params)
functions_config = []
functions_meta = meta.get('functions', [])
for func_meta in functions_meta:
params = self._build_function_params_from_meta(
func_meta, extract_to)
if not params:
continue
func_config = self.appsync_conn.create_function(api_id, params)
functions_config.append(func_config)
resolvers_meta = meta.get('resolvers', [])
for resolver_meta in resolvers_meta:
params = self._build_resolver_params_from_meta(
resolver_meta, extract_to, functions_config)
if not params:
continue
self.appsync_conn.create_resolver(api_id, **params)
if extract_to:
shutil.rmtree(extract_to, ignore_errors=True)
_LOG.info(f'Created AppSync GraphQL API {api_id}')
return self.describe_graphql_api(name=name, meta=meta, api_id=api_id)
def _build_data_source_params_from_meta(self, source_meta: dict):
source_name = source_meta.get('name')
try:
validate_params(
source_name, source_meta, DATA_SOURCE_REQUIRED_PARAMS)
except AssertionError as e:
_LOG.warning(str(e))
_LOG.warning(f'Skipping data source \'{source_name}\'...')
return
_LOG.info(f'Altering data source \'{source_name}\'...')
source_config = None
source_type = source_meta.get('type')
data_source_params = {
'name': source_name,
'source_type': source_type
}
if source_description := source_meta.get('description'):
data_source_params['description'] = source_description
if role_name := source_meta.get('service_role_name'):
role_arn = self._build_iam_role_arn(role_name)
data_source_params['service_role_arn'] = role_arn
if config_key := DATA_SOURCE_TYPE_CONFIG_MAPPING.get(source_type):
source_config = source_meta.get(config_key)
if source_type == AWS_LAMBDA_TYPE and source_config:
region = source_config.pop(AWS_REGION_PARAMETER, None)
lambda_name = source_config.pop('lambda_name', None)
source_config['lambda_function_arn'] = self.build_lambda_arn(
lambda_name, region)
elif source_type == 'AMAZON_EVENTBRIDGE' and source_config:
region = source_config.pop(AWS_REGION_PARAMETER)
event_bus = source_config.pop('event_bus_name')
source_config['event_bus_arn'] = self.build_event_bus_arn(
event_bus, region)
if source_config:
data_source_params['source_config'] = source_config
return data_source_params
def build_lambda_arn(self, name, region):
arn = f'arn:aws:lambda:{region}:{self.account_id}:function:{name}'
return arn
def build_event_bus_arn(self, name, region):
arn = f'arn:aws:events:{region}:{self.account_id}:event-bus/{name}'
return arn
def _build_iam_role_arn(self, name):
return f'arn:aws:iam::{self.account_id}:role/{name}'
@staticmethod
def _build_resolver_params_from_meta(resolver_meta, artifacts_path,
existing_functions=None):
existing_functions = existing_functions or []
type_name = resolver_meta.get('type_name')
field_name = resolver_meta.get('field_name')
try:
validate_params(type_name + ':' + field_name, resolver_meta,
RESOLVER_REQUIRED_PARAMS)
except AssertionError as e:
_LOG.warning(str(e))
_LOG.warning(f'Skipping resolver for type \'{type_name}\' '
f'and field \'{field_name}\'...')
return
_LOG.info(f'Altering resolver for type \'{type_name}\' and field '
f'\'{field_name}\'...')
resolver_params = {
'type_name': type_name,
'field_name': field_name,
}
kind = resolver_meta.get('kind', RESOLVER_DEFAULT_KIND)
resolver_params['kind'] = kind
if kind == RESOLVER_DEFAULT_KIND:
resolver_params['data_source_name'] = \
resolver_meta['data_source_name']
elif kind == 'PIPELINE':
_LOG.info("Trying to resolve functions IDs")
function_names = \
resolver_meta.get('pipeline_config', {}).get('functions', [])
function_ids = []
for func_name in function_names:
func_id = None
for func_info in existing_functions:
if func_name == func_info['name']:
func_id = func_info['functionId']
function_ids.append(func_id)
_LOG.debug(
f"Successfully resolved ID '{func_id}' for the "
f"function '{func_name}'")
break
if not func_id:
_LOG.warning(f"ID for the function '{func_name}' was "
f"not resolved")
resolver_params['pipeline_config'] = {
'functions': function_ids
}
if resolver_meta.get('runtime') in ('JS', 'APPSYNC_JS'):
runtime = {
'name': 'APPSYNC_JS',
'runtimeVersion': '1.0.0'
}
resolver_params['runtime'] = runtime
else:
runtime = None
if runtime:
code_path = build_path(artifacts_path,
resolver_meta.get('code_path'))
if not artifacts_path or not os.path.exists(code_path):
raise AssertionError(
f"Resolver code file for type '{type_name}' and field "
f"'{field_name}' not found.")
with open(code_path, 'r', encoding='utf-8') as file:
code = file.read()
resolver_params['code'] = code
else:
_LOG.debug('Runtime is not JS')
request_template_path = build_path(
artifacts_path,
resolver_meta.get('request_mapping_template_path'))
if not artifacts_path or not os.path.exists(request_template_path):
raise AssertionError(
f"Resolver request mapping template file for type "
f"'{type_name}' and field '{field_name}' not found.")
else:
with open(request_template_path, 'r',
encoding='utf-8') as file:
request_mapping_template = file.read()
response_template_path = build_path(
artifacts_path,
resolver_meta.get('response_mapping_template_path'))
if not artifacts_path or not os.path.exists(response_template_path):
raise AssertionError(
f"Resolver response mapping template file for type "
f"'{type_name}' and field '{field_name}' not found.")
else:
with open(response_template_path, 'r',
encoding='utf-8') as file:
response_mapping_template = file.read()
resolver_params['request_mapping_template'] = \
request_mapping_template
resolver_params['response_mapping_template'] = \
response_mapping_template
if max_batch_size := resolver_meta.get('max_batch_size'):
resolver_params['max_batch_size'] = max_batch_size
return resolver_params
@staticmethod
def _build_function_params_from_meta(func_meta, artifacts_path):
func_name = func_meta['name']
_LOG.debug(f"Building parameters for the function '{func_name}'")
try:
validate_params(func_name, func_meta, FUNCTION_REQUIRED_PARAMS)
except AssertionError as e:
_LOG.warning(str(e))
_LOG.warning(f"Skipping function '{func_name}'...")
return
function_params = {
'name': func_name,
'data_source_name': func_meta['data_source_name']
}
if description := func_meta.get('description'):
function_params['description'] = description
if func_meta.get('runtime') in ('JS', 'APPSYNC_JS'):
runtime = {
'name': 'APPSYNC_JS',
'runtimeVersion': '1.0.0'
}
function_params['runtime'] = runtime
else:
runtime = None
if runtime:
code_path = build_path(artifacts_path,
func_meta.get('code_path'))
if not artifacts_path or not os.path.exists(code_path):
raise AssertionError(
f"Function '{func_name}' code file not found.")
with open(code_path, 'r', encoding='utf-8') as file:
code = file.read()
function_params['code'] = code
else:
_LOG.debug('Runtime is not JS')
function_params['function_version'] = \
func_meta.get('function_version', DEFAULT_FUNC_VTL_VERSION)
request_template_path = build_path(
artifacts_path,
func_meta.get('request_mapping_template_path'))
if not artifacts_path or not os.path.exists(request_template_path):
raise AssertionError(
f"Function '{func_name}' request mapping template file "
f"not found.")
else:
with open(request_template_path, 'r',
encoding='utf-8') as file:
request_mapping_template = file.read()
response_template_path = build_path(
artifacts_path,
func_meta.get('response_mapping_template_path'))
if not artifacts_path or not os.path.exists(response_template_path):
raise AssertionError(
f"Function '{func_name}' response mapping template file "
f"not found.")
else:
with open(response_template_path, 'r',
encoding='utf-8') as file:
response_mapping_template = file.read()
function_params['request_mapping_template'] = \
request_mapping_template
function_params['response_mapping_template'] = \
response_mapping_template
if max_batch_size := func_meta.get('max_batch_size'):
function_params['max_batch_size'] = max_batch_size
return function_params
def _resolve_log_config(self, log_config: dict, api_name: str) -> dict:
if log_config.pop('logging_enabled', False):
_LOG.debug(
f"Building log_config parameters for the AppSync '{api_name}'"
)
log_role_name = log_config.pop('cloud_watch_logs_role_name', None)
if log_role_name:
log_config['cloud_watch_logs_role_arn'] = \
self._build_iam_role_arn(log_role_name)
else:
_LOG.warning(
f"Cloud watch logs can't be configured for the AppSync "
f"'{api_name}' because 'cloud_watch_logs_role_name' not "
f"specified"
)
log_config = {}
else:
log_config = {}
return log_config
def build_graphql_api_arn(self, api_id: str) -> str:
return f'arn:aws:appsync:{self.appsync_conn.client.meta.region_name}' \
f':{self.account_id}:apis/{api_id}'
def describe_graphql_api(self, name, meta, api_id=None):
if not api_id:
api = self.appsync_conn.get_graphql_api_by_name(name)
if not api:
return {}
api_id = api['apiId']
response = {'apiId': api_id}
arn = self.build_graphql_api_arn(api_id)
return {
arn: build_description_obj(response, name, meta)
}
def remove_graphql_api(self, args):
return self.create_pool(self._remove_graphql_api, args)
@unpack_kwargs
def _remove_graphql_api(self, arn, config):
api_id = config['description']['apiId']
try:
self.appsync_conn.delete_graphql_api(api_id)
group_names = self.cw_logs_conn.get_log_group_names()
for each in group_names:
if api_id == each.split('/')[-1]:
self.cw_logs_conn.delete_log_group_name(each)
_LOG.info(f'GraphQL API {api_id} was removed.')
return {arn: config}
except ClientError as e:
if e.response['Error']['Code'] == 'NotFoundException':
_LOG.warning(f'GraphQL API {api_id} is not found')
return {arn: config}
else:
raise e
def update_graphql_api(self, args):
return self.create_pool(self._update_graphql_api, args, 1)
@unpack_kwargs
def _update_graphql_api(self, name, meta, context):
extract_to = None
api = self.appsync_conn.get_graphql_api_by_name(name)
if not api:
raise AssertionError(f'{name} GraphQL API does not exist.')
archive_path = meta.get('deployment_package')
if archive_path:
extract_to = self._extract_zip(archive_path, name)
else:
_LOG.warning('Cannot find appsync deployment package!')
api_id = api['apiId']
auth_type = meta.get('primary_auth_type')
extra_auth_types = meta.get('extra_auth_types', [])
lambda_auth_config = meta.get('lambda_authorizer_config', {})
user_pool_config = meta.get('user_pool_config', {})
if auth_type == AWS_LAMBDA_TYPE:
lambda_region = lambda_auth_config.pop(AWS_REGION_PARAMETER, None)
lambda_name = lambda_auth_config.pop('resource_name', None)
if not lambda_region or not lambda_name:
lambda_auth_config = {}
USER_LOG.error(
f"Authorization type '{auth_type}' can't be configured "
f"for AppSync '{name}' because lambda resource name or "
f"aws region is not specified")
else:
lambda_arn = self.build_lambda_arn(
region=lambda_region,
name=lambda_name)
lambda_auth_config['authorizer_uri'] = lambda_arn
if auth_type == AWS_CUP_TYPE:
cup_name = user_pool_config.pop('resource_name', None)
cup_id = self.cup_conn.if_pool_exists_by_name(cup_name)
if cup_id:
user_pool_config['user_pool_id'] = cup_id
else:
user_pool_config = {}
USER_LOG.error(
f"Authorization type '{auth_type}' can't be configured "
f"for AppSync '{name}' because Cognito User Pool "
f"{cup_name} not found")
updated_extra_auth_types, is_extra_auth_api_key = \
self._process_extra_auth(extra_auth_types, name)
self.appsync_conn.update_graphql_api(
api_id, name, auth_type=auth_type,
user_pool_config=user_pool_config,
open_id_config=meta.get('open_id_config'),
lambda_auth_config=lambda_auth_config,
log_config=self._resolve_log_config(
meta.get('log_config', {}), name
),
xray_enabled=meta.get('xray_enabled'),
extra_auth_types=updated_extra_auth_types)
if auth_type == 'API_KEY' or is_extra_auth_api_key:
if not self.get_active_api_keys(api_id):
api_key_expiration = meta.get('api_key_expiration_days', 7)
now = time.time()
api_key_expires = \
int(now + api_key_expiration * ONE_DAY_IN_SECONDS)
self.appsync_conn.create_api_key(
api_id, expires=api_key_expires)
if schema_path := meta.get('schema_path'):
schema_full_path = build_path(extract_to, schema_path)
if not extract_to or not os.path.exists(schema_full_path):
raise AssertionError(
f'\'{schema_full_path}\' file not found for '
f'AppSync \'{name}\'')
with open(schema_full_path, 'r', encoding='utf-8') as file:
schema_definition = file.read()
status, details = \
self.appsync_conn.create_schema(api_id, schema_definition)
if status != 'SUCCESS':
error_message = (
f"An error occurred when updating schema. "
f"Operation status: '{status}'. ")
if details:
error_message += f"Details: '{details}'"
raise AssertionError(error_message)
else:
_LOG.info(
f"Schema of the AppSync '{name}' updated successfully")
existent_sources = self.appsync_conn.list_data_sources(api_id)
data_sources_meta = meta.get('data_sources', [])
for source_meta in data_sources_meta:
to_create = True
for source in existent_sources:
if source['name'] == source_meta['name']:
# update an existent one
to_create = False
params = self._build_data_source_params_from_meta(
source_meta)
if not params:
break
_LOG.debug(f"Updating data source '{source['name']}'")
self.appsync_conn.update_data_source(api_id, **params)
existent_sources.remove(source)
break
# create a new one
if to_create:
params = self._build_data_source_params_from_meta(source_meta)
if not params:
continue
_LOG.debug(f"Creating data source '{source_meta['name']}'")
self.appsync_conn.create_data_source(api_id, **params)
for source in existent_sources:
self.appsync_conn.delete_data_source(api_id, source['name'])
actual_funcs = []
existent_funcs = self.appsync_conn.list_functions(api_id)
funcs_meta = meta.get('functions', [])
for func_meta in funcs_meta:
to_create = True
for func_conf in existent_funcs:
if func_meta['name'] == func_conf['name']:
# update an existent one
to_create = False
params = self._build_function_params_from_meta(
func_meta, extract_to
)
if not params:
break
_LOG.debug(f"Updating function '{func_conf['name']}'")
actual_funcs.append(
self.appsync_conn.update_function(
api_id, func_conf['functionId'], params))
existent_funcs.remove(func_conf)
break
# create a new one
if to_create:
params = self._build_function_params_from_meta(
func_meta, extract_to
)
if not params:
break
_LOG.debug(f"Creating function '{func_meta['name']}'")
actual_funcs.append(
self.appsync_conn.create_function(api_id, params))
types = self.appsync_conn.list_types(api_id)
existent_resolvers = []
for t in types:
existent_resolvers.extend(
self.appsync_conn.list_resolvers(api_id, t['name']))
resolvers_meta = meta.get('resolvers', [])
for resolver_meta in resolvers_meta:
to_create = True
for resolver in existent_resolvers:
if resolver['typeName'] == resolver_meta['type_name'] and \
resolver['fieldName'] == resolver_meta['field_name']:
# update an existent one
to_create = False
params = self._build_resolver_params_from_meta(
resolver_meta, extract_to, actual_funcs)
if not params:
break
_LOG.debug(
f"Updating resolver for type '{resolver['typeName']}' "
f"and field '{resolver['fieldName']}'")
self.appsync_conn.update_resolver(api_id, **params)
existent_resolvers.remove(resolver)
break
# create a new one
if to_create:
params = self._build_resolver_params_from_meta(
resolver_meta, extract_to, actual_funcs)
if not params:
continue
_LOG.debug(
f"Creating resolver for type '{resolver_meta['typeName']}' "
f"and field '{resolver_meta['fieldName']}'")
self.appsync_conn.create_resolver(api_id, **params)
for resolver in existent_resolvers:
self.appsync_conn.delete_resolver(api_id,
type_name=resolver['typeName'],
field_name=resolver['fieldName'])
for func_conf in existent_funcs:
self.appsync_conn.delete_function(api_id, func_conf['functionId'])
_LOG.info(f'Updated AppSync GraphQL API {api_id}')
return self.describe_graphql_api(name=name, meta=meta, api_id=api_id)
def _extract_zip(self, path: str, name: str):
from syndicate.core import PROJECT_STATE
extract_to = PurePath(self.conf_path, ARTIFACTS_FOLDER,
name).as_posix()
artifact_src_path = posixpath.join(
self.deploy_target_bucket_key_compound,
PROJECT_STATE.current_bundle, path)
if not self.s3_conn.is_file_exists(self.deploy_target_bucket,
artifact_src_path):
raise AssertionError(
f"Deployment package for Appsync '{name}' not found by the "
f"path '{artifact_src_path}'")
_LOG.info(f'Downloading an artifact for Appsync \'{name}\'')
with io.BytesIO() as artifact:
self.s3_conn.download_to_file(
bucket_name=self.deploy_target_bucket,
key=artifact_src_path,
file=artifact)
with ZipFile(artifact, 'r') as zf:
zf.extractall(extract_to)
return extract_to
def _process_extra_auth(self, extra_auth_types, api_name):
is_extra_auth_api_key = False
result = []
for auth in extra_auth_types:
if auth['authentication_type'] == AWS_LAMBDA_TYPE:
lambda_region = \
auth.get('lambda_authorizer_config', {}).pop(
AWS_REGION_PARAMETER, None)
lambda_name = \
auth.get('lambda_authorizer_config', {}).pop(
'resource_name', None)
if not lambda_region or not lambda_name:
USER_LOG.error(
f"Authorization type '{AWS_LAMBDA_TYPE}' can't be "
f"configured for AppSync '{api_name}' because lambda "
f"resource name or aws region is not specified")
else:
lambda_arn = self.build_lambda_arn(
region=lambda_region,
name=lambda_name)
auth['lambda_authorizer_config']['authorizer_uri'] = \
lambda_arn
result.append(dict_keys_to_camel_case(auth))
elif auth['authentication_type'] == AWS_CUP_TYPE:
cup_name = \
auth.get('user_pool_config', {}).pop(
'resource_name', None)
cup_id = self.cup_conn.if_pool_exists_by_name(cup_name)
if not cup_id:
USER_LOG.error(
f"Authorization type '{AWS_CUP_TYPE}' can't be "
f"configured for AppSync '{api_name}' because Cognito "
f"User Pool '{cup_name}' not found")
else:
auth['user_pool_config']['user_pool_id'] = cup_id
result.append(dict_keys_to_camel_case(auth))
elif auth['authentication_type'] == 'API_KEY':
result.append(dict_keys_to_camel_case(auth))
is_extra_auth_api_key = True
else:
result.append(dict_keys_to_camel_case(auth))
return result, is_extra_auth_api_key
def get_active_api_keys(self, api_id):
api_keys = self.appsync_conn.list_api_keys(api_id)
now = time.time()
return [api_key for api_key in api_keys if api_key['expires'] > now]