in syndicate/core/resources/api_gateway_resource.py [0:0]
def _create_method_from_metadata(
self, api_id, resource_id, resource_path, method, method_meta,
authorizers_mapping, enable_cors=False, api_resp=None,
api_integration_resp=None,
resources_statement_singleton: bool = False,
methods_statement_singleton: bool = False):
resources_statement_singleton = resources_statement_singleton or False
methods_statement_singleton = methods_statement_singleton or False
# init responses for method
resp = self.init_method_responses(api_resp, method_meta)
# init integration responses for method
integr_resp = self.init_integration_method_responses(
api_integration_resp, method_meta)
# resolve authorizer if needed
authorization_type = method_meta.get('authorization_type')
if authorization_type not in ['NONE', 'AWS_IAM']:
# type is authorizer, so add id to meta
authorizer_id = authorizers_mapping.get(authorization_type)
if not authorizer_id:
raise AssertionError(
'Authorizer {0} does not exist'.format(authorization_type))
method_meta['authorizer_id'] = authorizer_id
authorizer = self.connection.get_authorizer(
api_id, authorizer_id).get('type')
if authorizer == _COGNITO_AUTHORIZER_TYPE:
authorization_type = _COGNITO_AUTHORIZER_TYPE
else:
authorization_type = _CUSTOM_AUTHORIZER_TYPE
method_request_models = method_meta.get('method_request_models')
if method_request_models:
(content_type, name), = method_request_models.items()
model = self.connection.get_model(api_id, name)
method_request_models = model if not model else method_request_models
request_validator_id = self._retrieve_request_validator_id(
api_id, method_meta.get('request_validator'))
self.connection.create_method(
api_id, resource_id, method,
authorization_type=authorization_type,
authorizer_id=method_meta.get('authorizer_id'),
api_key_required=method_meta.get('api_key_required'),
request_parameters=method_meta.get('method_request_parameters'),
request_models=method_request_models,
request_validator=request_validator_id)
# second step: create integration
integration_type = method_meta.get('integration_type')
# set up integration - lambda or aws service
body_template = method_meta.get('integration_request_body_template')
passthrough_behavior = method_meta.get(
'integration_passthrough_behavior')
request_parameters = method_meta.get('integration_request_parameters')
# TODO split to map - func implementation
if integration_type:
if integration_type == 'lambda':
lambda_name = method_meta['lambda_name']
# alias has a higher priority than version in arn resolving
lambda_version = method_meta.get('lambda_version')
lambda_alias = method_meta.get('lambda_alias')
lambda_arn = self.lambda_res. \
resolve_lambda_arn_by_version_and_alias(lambda_name,
lambda_version,
lambda_alias)
enable_proxy = method_meta.get('enable_proxy')
cache_configuration = method_meta.get('cache_configuration')
cache_key_parameters = cache_configuration.get(
'cache_key_parameters') if cache_configuration else None
self.connection.create_lambda_integration(
lambda_arn, api_id, resource_id, method, body_template,
passthrough_behavior, method_meta.get('lambda_region'),
enable_proxy=enable_proxy,
cache_key_parameters=cache_key_parameters,
request_parameters=request_parameters)
# add permissions to invoke
# Allows to apply method or resource singleton of a policy
# statement, setting wildcard on the respective scope.
api_source_arn = f"arn:aws:execute-api:{self.region}:" \
f"{self.account_id}:{api_id}/*" \
"/{method}/{path}"
_method, _path = method, resource_path.lstrip('/')
if resources_statement_singleton:
_path = '*'
if methods_statement_singleton:
_method = '*'
api_source_arn = api_source_arn.format(
method=_method, path=_path
)
_id = f'{lambda_arn}-{api_source_arn}'
statement_id = md5(_id.encode('utf-8')).hexdigest()
response: dict = self.lambda_res.add_invocation_permission(
name=lambda_arn,
principal='apigateway.amazonaws.com',
source_arn=api_source_arn,
statement_id=statement_id,
exists_ok=True
)
if response is None:
message = f'Permission: \'{statement_id}\' attached to ' \
f'\'{lambda_arn}\' lambda to allow ' \
f'lambda:InvokeFunction for ' \
f'apigateway.amazonaws.com principal from ' \
f'\'{api_source_arn}\' SourceArn already exists.'
_LOG.warning(message + ' Skipping.')
elif integration_type == 'service':
uri = method_meta.get('uri')
role = method_meta.get('role')
integration_method = method_meta.get('integration_method')
self.connection.create_service_integration(self.account_id,
api_id,
resource_id,
method,
integration_method,
role, uri,
body_template,
passthrough_behavior,
request_parameters)
elif integration_type == 'mock':
self.connection.create_mock_integration(api_id, resource_id,
method,
body_template,
passthrough_behavior)
elif integration_type == 'http':
integration_method = method_meta.get('integration_method')
uri = method_meta.get('uri')
enable_proxy = method_meta.get('enable_proxy')
self.connection.create_http_integration(api_id, resource_id,
method,
integration_method,
uri,
body_template,
passthrough_behavior,
enable_proxy)
else:
raise AssertionError('%s integration type does not exist.',
integration_type)
# third step: setup method responses
if resp:
for response in resp:
self.connection.create_method_response(
api_id, resource_id, method, response.get('status_code'),
response.get('response_parameters'),
response.get('response_models'), enable_cors)
else:
self.connection.create_method_response(
api_id, resource_id, method, enable_cors=enable_cors)
# fourth step: setup integration responses
if integr_resp:
for each in integr_resp:
self.connection.create_integration_response(
api_id, resource_id, method, each.get('status_code'),
each.get('error_regex'),
each.get('response_parameters'),
each.get('response_templates'), enable_cors)
else:
self.connection.create_integration_response(
api_id, resource_id, method, enable_cors=enable_cors)