in syndicate/core/resources/appsync_resource.py [0:0]
def _build_function_params_from_meta(func_meta, artifacts_path):
func_name = func_meta['name']
_LOG.debug(f"Building parameters for the function '{func_name}'")
try:
validate_params(func_name, func_meta, FUNCTION_REQUIRED_PARAMS)
except AssertionError as e:
_LOG.warning(str(e))
_LOG.warning(f"Skipping function '{func_name}'...")
return
function_params = {
'name': func_name,
'data_source_name': func_meta['data_source_name']
}
if description := func_meta.get('description'):
function_params['description'] = description
if func_meta.get('runtime') in ('JS', 'APPSYNC_JS'):
runtime = {
'name': 'APPSYNC_JS',
'runtimeVersion': '1.0.0'
}
function_params['runtime'] = runtime
else:
runtime = None
if runtime:
code_path = build_path(artifacts_path,
func_meta.get('code_path'))
if not artifacts_path or not os.path.exists(code_path):
raise AssertionError(
f"Function '{func_name}' code file not found.")
with open(code_path, 'r', encoding='utf-8') as file:
code = file.read()
function_params['code'] = code
else:
_LOG.debug('Runtime is not JS')
function_params['function_version'] = \
func_meta.get('function_version', DEFAULT_FUNC_VTL_VERSION)
request_template_path = build_path(
artifacts_path,
func_meta.get('request_mapping_template_path'))
if not artifacts_path or not os.path.exists(request_template_path):
raise AssertionError(
f"Function '{func_name}' request mapping template file "
f"not found.")
else:
with open(request_template_path, 'r',
encoding='utf-8') as file:
request_mapping_template = file.read()
response_template_path = build_path(
artifacts_path,
func_meta.get('response_mapping_template_path'))
if not artifacts_path or not os.path.exists(response_template_path):
raise AssertionError(
f"Function '{func_name}' response mapping template file "
f"not found.")
else:
with open(response_template_path, 'r',
encoding='utf-8') as file:
response_mapping_template = file.read()
function_params['request_mapping_template'] = \
request_mapping_template
function_params['response_mapping_template'] = \
response_mapping_template
if max_batch_size := func_meta.get('max_batch_size'):
function_params['max_batch_size'] = max_batch_size
return function_params