syndicate/core/resources/api_gateway_resource.py (1,068 lines of code) (raw):

""" Copyright 2018 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import json import time from hashlib import md5 from botocore.exceptions import ClientError from syndicate.commons import deep_get from syndicate.commons.log_helper import get_logger, get_user_logger from syndicate.connection import LogsConnection from syndicate.core.constants import ( SOURCE_ARN_DEEP_KEY, SECURITY_SCHEMAS_DEEP_KEY, API_GW_DEFAULT_THROTTLING_RATE_LIMIT, API_GW_DEFAULT_THROTTLING_BURST_LIMIT ) from syndicate.core.helper import unpack_kwargs from syndicate.core.resources.base_resource import BaseResource from syndicate.core.resources.helper import (build_description_obj, validate_params) from syndicate.connection.api_gateway_connection import ApiGatewayV2Connection, \ ApiGatewayConnection from syndicate.core.resources.lambda_resource import LambdaResource API_REQUIRED_PARAMS = ['resources', 'deploy_stage'] _LOG = get_logger(__name__) USER_LOG = get_user_logger() SUPPORTED_METHODS = ['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS', 'HEAD', 'ANY'] SUPPORTED_STAGE_METHODS = ['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS', 'HEAD'] _CORS_HEADER_NAME = 'Access-Control-Allow-Origin' _CORS_HEADER_VALUE = "'*'" _COGNITO_AUTHORIZER_TYPE = 'COGNITO_USER_POOLS' _CUSTOM_AUTHORIZER_TYPE = 'CUSTOM' X_SDCT_EXTENSION_KEY = 'x-syndicate-cognito-userpool-names' PROVIDER_ARNS_KEY = 'providerARNs' POLICY_STATEMENT_SINGLETON = 'policy_statement_singleton' _REQUEST_VALIDATORS = { 'NONE': { 'validate_request_body': False, 'validate_request_parameters': False, 'id': None }, 'Validate body': { 'validate_request_body': True, 'validate_request_parameters': False, 'id': None }, 'Validate query string parameters and headers': { 'validate_request_parameters': True, 'validate_request_body': False, 'id': None }, 'Validate body, query string parameters, and headers': { 'validate_request_body': True, 'validate_request_parameters': True, 'id': None } } _DISABLE_THROTTLING_VALUE = -1 OPERATION_REPLACE = 'replace' class ApiGatewayResource(BaseResource): def __init__(self, apigw_conn: ApiGatewayConnection, apigw_v2_conn: ApiGatewayV2Connection, cw_logs_conn: LogsConnection, lambda_res: LambdaResource, cognito_res, account_id, region) -> None: self.connection = apigw_conn self.lambda_res = lambda_res self.cognito_res = cognito_res self.account_id = account_id self.region = region self.apigw_v2 = apigw_v2_conn self.cw_logs_conn = cw_logs_conn def _create_default_validators(self, api_id): for name, options in _REQUEST_VALIDATORS.items(): _id = self.connection.create_request_validator( api_id, name, options['validate_request_body'], options['validate_request_parameters'] ) options['id'] = _id def _retrieve_request_validator_id(self, api_id, request_validator=None): request_validator = request_validator or {} if not request_validator: return validate_request_body = request_validator.get( 'validate_request_body') or False validate_request_parameters = request_validator.get( 'validate_request_parameters') or False name = request_validator.get('name') if name: return self.connection.create_request_validator( api_id, name, validate_request_body, validate_request_parameters) for validators in _REQUEST_VALIDATORS.values(): if (validators['validate_request_body'] == validate_request_body) \ and (validators[ 'validate_request_parameters'] == validate_request_parameters): return validators['id'] def api_resource_identifier(self, name, output=None): if output: # api currently is not located in different regions # process only first object api_output = list(output.items())[0][1] # find id from the output return api_output['description']['id'] # if output is not provided - try to get API by name # cause there is no another option return self.connection.get_api_id(name) def create_api_gateway(self, args): """ Create api gateway in pool in sub processes. :type args: list """ return self.create_pool(self._create_api_gateway_from_meta, args, 1) def create_api_gateway_openapi(self, args): """ Create OpenAPI api gateway in pool in sub processes. :type args: list """ return self.create_pool(self._create_api_gateway_openapi_from_meta, args, 1) def api_gateway_update_processor(self, args): return self.create_pool(self._create_or_update_api_gateway, args, 1) def update_api_gateway_openapi(self, args): return self.create_pool(self._update_api_gateway_openapi_from_meta, args, 1) @unpack_kwargs def _create_or_update_api_gateway(self, name, meta, current_configurations): if current_configurations: # api currently is not located in different regions # process only first object api_output = list(current_configurations.items())[0][1] # find id from the output api_id = api_output['description']['id'] # check that api does not exist api_response = self.connection.get_api(api_id) if api_response: # find all existing resources existing_resources = api_output['description']['resources'] existing_paths = [i['path'] for i in existing_resources] meta_api_resources = meta['resources'] resources_statement_singleton = meta.get( POLICY_STATEMENT_SINGLETON) api_resp = meta.get('api_method_responses') api_integration_resp = meta.get( 'api_method_integration_responses') api_resources = {} for resource_path, resource_meta in meta_api_resources.items(): if resource_path not in existing_paths: api_resources[resource_path] = resource_meta if api_resources: _LOG.debug( 'Going to continue deploy API Gateway {0} ...'.format( api_id)) args = self.__prepare_api_resources_args( api_id=api_id, api_resources=api_resources, api_resp=api_resp, api_integration_resp=api_integration_resp, resources_statement_singleton=resources_statement_singleton) self.create_pool(self._create_resource_from_metadata, args, 1) # add headers # waiter b4 customization time.sleep(10) _LOG.debug( 'Customizing API Gateway {0} responses...'.format( api_id)) else: # all resources created, but need to override api_resources = meta_api_resources # _customize_gateway_responses call is commented due to # botocore InternalFailure while performing the call. # will be fixed later # _customize_gateway_responses(api_id) # deploy api _LOG.debug('Deploying API Gateway {0} ...'.format(api_id)) self.__deploy_api_gateway(api_id, meta, api_resources) return self.describe_api_resources(api_id=api_id, meta=meta, name=name) else: # api does not exist, so create a new return self._create_api_gateway_from_meta( {'name': name, 'meta': meta}) else: # object is not present, so just create a new api return self._create_api_gateway_from_meta( {'name': name, 'meta': meta}) def _escape_path(self, parameter): index = parameter.find('/', 0) if index == -1: return parameter parameter = parameter[:index] + '~1' + parameter[index + 1:] return self._escape_path(parameter) def configure_resources(self, api_id, stage_name, api_resources): for resource_path, resource_meta in api_resources.items(): for method_name, method_meta in resource_meta.items(): if method_name in SUPPORTED_METHODS: cache_configuration = method_meta.get( 'cache_configuration') throttling_configuration = method_meta.get( 'throttling_configuration') cache_ttl_setting = cache_configuration.get( 'cache_ttl_sec') if cache_configuration else None encrypt_cache_data = cache_configuration.get( 'encrypt_cache_data') if cache_configuration else None throttling_enabled = throttling_configuration.get( 'throttling_enabled') if throttling_configuration \ else None throttling_rate_limit = throttling_configuration.get( 'throttling_rate_limit', API_GW_DEFAULT_THROTTLING_RATE_LIMIT) if ( throttling_configuration and throttling_enabled) else \ _DISABLE_THROTTLING_VALUE throttling_burst_limit = throttling_configuration.get( 'throttling_burst_limit', API_GW_DEFAULT_THROTTLING_BURST_LIMIT) if ( throttling_configuration and throttling_enabled) else \ _DISABLE_THROTTLING_VALUE patch_operations = [] escaped_resource = self._escape_path(resource_path) methods_to_configure = SUPPORTED_STAGE_METHODS \ if method_name == 'ANY' else [method_name] for method in methods_to_configure: if cache_ttl_setting is not None: _LOG.info(f'Configuring cache for {resource_path};' f' TTL: {cache_ttl_setting}') patch_operations.append({ 'op': OPERATION_REPLACE, 'path': f'/{escaped_resource}/{method}' f'/caching/ttlInSeconds', 'value': str(cache_ttl_setting), }) patch_operations.append({ 'op': OPERATION_REPLACE, 'path': f'/{escaped_resource}/{method}' f'/caching/enabled', 'value': 'True', }) if encrypt_cache_data is not None: patch_operations.append({ 'op': OPERATION_REPLACE, 'path': f'/{escaped_resource}/{method}' f'/caching/enabled', 'value': 'True', }) patch_operations.append({ 'op': OPERATION_REPLACE, 'path': f'/{escaped_resource}/{method}' f'/caching/dataEncrypted', 'value': 'true' if bool( encrypt_cache_data) else 'false' }) if throttling_enabled: _LOG.info( f'Configuring throttling for {resource_path}; ' f'rateLimit: {throttling_rate_limit}; ' f'burstLimit: {throttling_burst_limit}') else: _LOG.info( f'Throttling for {resource_path} disabled.') patch_operations.append({ 'op': OPERATION_REPLACE, 'path': f'/{escaped_resource}/{method}' f'/throttling/rateLimit', 'value': str(throttling_rate_limit), }) patch_operations.append({ 'op': OPERATION_REPLACE, 'path': f'/{escaped_resource}/{method}/' f'throttling/burstLimit', 'value': str(throttling_burst_limit), }) log_config = method_meta.get('logging_configuration') if isinstance(log_config, dict): logging_enabled = log_config.get('logging_enabled') else: logging_enabled = False if logging_enabled: _LOG.info( f'Configuring logging for {resource_path};' f'log_level: ' f"{log_config.get('log_level', 'ERROR')};" f'data_tracing: ' f"{log_config.get('data_tracing', False)};" f'detailed_metrics: ' f"{log_config.get('detailed_metrics', False)}" ) patch_operations.append({ 'op': OPERATION_REPLACE, 'path': f'/{escaped_resource}/{method}/' f'logging/loglevel', 'value': log_config.get('log_level', 'ERROR'), }) if log_config.get('data_tracing'): patch_operations.append({ 'op': OPERATION_REPLACE, 'path': f'/{escaped_resource}/{method}/' f'logging/dataTrace', 'value': 'true', }) if log_config.get('detailed_metrics'): patch_operations.append({ 'op': OPERATION_REPLACE, 'path': f'/{escaped_resource}/{method}/' f'metrics/enabled', 'value': 'true', }) if patch_operations: self.connection.update_configuration( rest_api_id=api_id, stage_name=stage_name, patch_operations=patch_operations ) _LOG.info(f'Resource {resource_path} was configured') @unpack_kwargs def _create_api_gateway_from_meta(self, name, meta): """ Create API Gateway with all specified meta. :type name: str :type meta: dict """ validate_params(name, meta, API_REQUIRED_PARAMS) api_resources = meta['resources'] # whether to put a wildcard in lambda resource-based policy permissions resources_permission_singleton = meta.get(POLICY_STATEMENT_SINGLETON) api_gw_describe = self.describe_api_resources(name, meta) if api_gw_describe: _LOG.info(f'Api gateway with name \'{name}\' exists. Returning') return api_gw_describe _LOG.info(f'Api gateway with name \'{name}\' does not exist. Creating') api_item = self.connection.create_rest_api( api_name=name, binary_media_types=meta.get('binary_media_types'), tags=meta.get('tags')) api_id = api_item['id'] # create default request validators self._create_default_validators(api_id) # set minimumCompressionSize if the param exists minimum_compression_size = meta.get('minimum_compression_size', None) if not minimum_compression_size: _LOG.debug("No minimal_compression_size param - " "compression isn't enabled") self.connection.update_compression_size( rest_api_id=api_id, compression_size=minimum_compression_size) # deploy authorizers authorizers = meta.get('authorizers', {}) for key, val in authorizers.items(): uri = None provider_arns = [] if val.get('type') == _COGNITO_AUTHORIZER_TYPE: for pool in val.get('user_pools'): user_pool_id = self.cognito_res.get_user_pool_id(pool) provider_arns.append( f'arn:aws:cognito-idp:{self.region}:{self.account_id}:' f'userpool/{user_pool_id}') else: lambda_version = val.get('lambda_version') lambda_name = val.get('lambda_name') lambda_alias = val.get('lambda_alias') lambda_arn = self.lambda_res. \ resolve_lambda_arn_by_version_and_alias(lambda_name, lambda_version, lambda_alias) uri = f'arn:aws:apigateway:{self.region}:lambda:path/' \ f'2015-03-31/functions/{lambda_arn}/invocations' api_source_arn = (f'arn:aws:execute-api:{self.region}:' f'{self.account_id}:{api_id}/*/*') self.lambda_res.add_invocation_permission( statement_id=api_id, name=lambda_arn, source_arn=api_source_arn, principal='apigateway.amazonaws.com') self.connection.create_authorizer(api_id=api_id, name=key, type=val['type'], authorizer_uri=uri, identity_source=val.get( 'identity_source'), ttl=val.get('ttl'), provider_arns=provider_arns) models = meta.get('models') if models: args = [{'api_id': api_id, 'models': {k: v}} for k, v in models.items()] self.create_pool(self._create_model_from_metadata, args, 1) if api_resources: api_resp = meta.get('api_method_responses') api_integration_resp = meta.get('api_method_integration_responses') args = self.__prepare_api_resources_args( api_id, api_resources, api_resp, api_integration_resp, resources_permission_singleton) self.create_pool(self._create_resource_from_metadata, args, 1) else: _LOG.info('There is no resources in %s API Gateway description.', name) # add headers # waiter b4 customization time.sleep(10) _LOG.debug('Customizing API Gateway responses...') # _customize_gateway_responses call is commented due to botocore # InternalFailure while performing the call. will be fixed later # _customize_gateway_responses(api_id) # deploy api self.__deploy_api_gateway(api_id, meta, api_resources) return self.describe_api_resources(api_id=api_id, meta=meta, name=name) @unpack_kwargs def _create_api_gateway_openapi_from_meta(self, name, meta): openapi_context = meta.get('definition') deploy_stage = meta.get('deploy_stage') self._resolve_cup_ids(openapi_context) api_gw_describe = self.describe_api_resources(name, meta) if api_gw_describe: _LOG.info(f'Api gateway with name \'{name}\' exists. Returning') return api_gw_describe _LOG.info(f'Api gateway with name \'{name}\' does not exist. Creating') api_id = self.connection.create_openapi(openapi_context) _LOG.debug('Applying tags') self.connection.tag_openapi(openapi_id=api_id, tags=meta.get('tags')) self.connection.deploy_api(api_id, deploy_stage) api_lambdas_arns = self.extract_api_gateway_lambdas_arns( openapi_context) self.create_lambdas_permissions(api_id, api_lambdas_arns, '/*/*/*') api_lambda_auth_arns = self.extract_api_gateway_lambda_auth_arns( openapi_context) self.create_lambdas_permissions(api_id, api_lambda_auth_arns, '/*/*') return self.describe_api_resources(api_id=api_id, meta=meta, name=name) @unpack_kwargs def _update_api_gateway_openapi_from_meta(self, name, meta, context): api_id = self.connection.get_api_id(name) openapi_context = meta.get('definition') deploy_stage = meta.get('deploy_stage') self._resolve_cup_ids(openapi_context) self.connection.update_openapi(api_id, openapi_context) self.connection.deploy_api(api_id, deploy_stage) api_lambdas_arns = self.extract_api_gateway_lambdas_arns( openapi_context ) self.create_lambdas_permissions(api_id, api_lambdas_arns, '/*/*/*') api_lambda_auth_arns = self.extract_api_gateway_lambda_auth_arns( openapi_context ) self.create_lambdas_permissions(api_id, api_lambda_auth_arns, '/*/*') return self.describe_api_resources(api_id=api_id, meta=meta, name=name) def _resolve_cup_ids(self, openapi_context): _LOG.debug('Going to resolve Cognito User Pools ARNs') security_schemes = \ openapi_context.get('components', {}).get('securitySchemes', {}) authorizers = [ value['x-amazon-apigateway-authorizer'] for _, value in security_schemes.items() if (value.get('x-amazon-apigateway-authtype') == _COGNITO_AUTHORIZER_TYPE.lower() and 'x-amazon-apigateway-authorizer' in value)] for authorizer in authorizers: pools_names = provider_arns = None if authorizer.get('type') == _COGNITO_AUTHORIZER_TYPE.lower(): pools_names = authorizer.get(X_SDCT_EXTENSION_KEY) provider_arns = authorizer.get(PROVIDER_ARNS_KEY) new_provider_arns = [] if pools_names: for pool_name in pools_names: _LOG.debug(f'Resolving ARN for Cognito User Pool by name ' f'{pool_name}') pool_id = self.cognito_res.get_user_pool_id(pool_name) if pool_id: new_provider_arns.append( f'arn:aws:cognito-idp:{self.region}:' f'{self.account_id}:userpool/{pool_id}') else: USER_LOG.warn(f'Can\'t resolve Cognito User Pool ID ' f'by name "{pool_name}"! For more ' f'details see syndicate logs.') elif provider_arns: for arn in provider_arns: pool_id = arn.split('/')[-1] _LOG.debug(f'Resolving ARN for Cognito User Pool by ID ' f'{pool_id}') if self.cognito_res.is_user_pool_exists(pool_id): new_provider_arns.append(arn) else: USER_LOG.warn(f'Cognito User Pool with ID {pool_id} ' f'not found.') if new_provider_arns: _LOG.debug(f'Going to apply the next provider ARNs ' f'{new_provider_arns} to API Gateway ' f'{deep_get(openapi_context, ["info", "title"])}') authorizer[PROVIDER_ARNS_KEY] = new_provider_arns else: raise AssertionError( f'Cognito User Pools can\'t be resolved by ' + 'names: ' f'{pools_names}' if pools_names else f'ARNs: {provider_arns}') def get_lambda_permissions_for_api(self, lambda_arn, api_gateway_id): permissions = self.lambda_res.get_existing_permissions(lambda_arn) # Filter the permissions related to the specific API Gateway filtered_permissions = [ statement for statement in permissions if deep_get( statement, SOURCE_ARN_DEEP_KEY, "" ).startswith('arn:aws:execute-api:') and api_gateway_id in deep_get( statement, SOURCE_ARN_DEEP_KEY, "") ] return filtered_permissions def create_lambdas_permissions( self, api_gateway_id: str, api_lambdas_arns: set[str], route: str ): api_source_arn = (f'arn:aws:execute-api:{self.region}:' f'{self.account_id}:{api_gateway_id}{route}') for lambda_arn in api_lambdas_arns: _id = f'{lambda_arn}-{api_source_arn}' statement_id = md5(_id.encode('utf-8')).hexdigest() self.lambda_res.add_invocation_permission( name=lambda_arn, principal='apigateway.amazonaws.com', source_arn=api_source_arn, statement_id=statement_id, exists_ok=True ) def remove_lambdas_permissions(self, api_gateway_id, api_lambdas_arns): for lambda_arn in api_lambdas_arns: existing_permissions = self.get_lambda_permissions_for_api( lambda_arn, api_gateway_id) existing_permissions = { deep_get(perm, SOURCE_ARN_DEEP_KEY): perm.get('Sid') for perm in existing_permissions } self.lambda_res.remove_permissions(lambda_arn, existing_permissions.values()) @staticmethod def get_deploy_stage_name(stage_name=None): return stage_name if stage_name else 'prod' def __deploy_api_gateway(self, api_id, meta, api_resources): deploy_stage = self.get_deploy_stage_name(meta.get('deploy_stage')) cache_cluster_configuration = meta.get('cluster_cache_configuration') root_cache_enabled = cache_cluster_configuration.get( 'cache_enabled') if cache_cluster_configuration else None cache_size = cache_cluster_configuration.get( 'cache_size') if cache_cluster_configuration else None self.connection.deploy_api(api_id, stage_name=deploy_stage, cache_cluster_enabled=root_cache_enabled, cache_cluster_size=str( cache_size) if cache_size else None) patch_operations = [] throttling_cluster_configuration = meta.get( 'cluster_throttling_configuration') throttling_enabled = throttling_cluster_configuration.get( 'throttling_enabled') if throttling_cluster_configuration else None if not throttling_enabled: patch_operations.append({ 'op': OPERATION_REPLACE, 'path': '/*/*/throttling/rateLimit', 'value': str(_DISABLE_THROTTLING_VALUE), }) patch_operations.append({ 'op': OPERATION_REPLACE, 'path': '/*/*/throttling/burstLimit', 'value': str(_DISABLE_THROTTLING_VALUE), }) # configure caching if root_cache_enabled: _LOG.debug( f'Cluster cache configuration found: ' f'{cache_cluster_configuration}' ) # set default ttl for root endpoint cluster_cache_ttl_sec = cache_cluster_configuration.get( 'cache_ttl_sec') encrypt_cache_data = cache_cluster_configuration.get( 'encrypt_cache_data') if cluster_cache_ttl_sec is not None: patch_operations.append({ 'op': OPERATION_REPLACE, 'path': '/*/*/caching/ttlInSeconds', 'value': str(cluster_cache_ttl_sec), }) if encrypt_cache_data is not None: patch_operations.append({ 'op': OPERATION_REPLACE, 'path': '/*/*/caching/dataEncrypted', 'value': 'true' if bool(encrypt_cache_data) else 'false' }) # configure throttling if throttling_enabled: throttling_rate_limit = throttling_cluster_configuration.get( 'throttling_rate_limit', API_GW_DEFAULT_THROTTLING_RATE_LIMIT) throttling_burst_limit = throttling_cluster_configuration.get( 'throttling_burst_limit', API_GW_DEFAULT_THROTTLING_BURST_LIMIT) patch_operations.append({ 'op': OPERATION_REPLACE, 'path': '/*/*/throttling/rateLimit', 'value': str(throttling_rate_limit), }) patch_operations.append({ 'op': OPERATION_REPLACE, 'path': '/*/*/throttling/burstLimit', 'value': str(throttling_burst_limit), }) # configure logging log_config = meta.get('logging_configuration') logging_enabled = log_config.get('logging_enabled') if ( isinstance(log_config, dict)) else False if logging_enabled: patch_operations.append({ 'op': OPERATION_REPLACE, 'path': '/*/*/logging/loglevel', 'value': log_config.get('log_level', 'ERROR'), }) if log_config.get('data_tracing'): patch_operations.append({ 'op': OPERATION_REPLACE, 'path': '/*/*/logging/dataTrace', 'value': 'true', }) if log_config.get('detailed_metrics'): patch_operations.append({ 'op': OPERATION_REPLACE, 'path': '/*/*/metrics/enabled', 'value': 'true', }) if any([root_cache_enabled, throttling_enabled, logging_enabled]): self.connection.update_configuration( rest_api_id=api_id, stage_name=deploy_stage, patch_operations=patch_operations ) # customize settings for endpoints self.configure_resources(api_id, deploy_stage, api_resources) def __prepare_api_resources_args( self, api_id, api_resources, api_resp=None, api_integration_resp=None, resources_statement_singleton: bool = False): # describe authorizers and create a mapping authorizers = self.connection.get_authorizers(api_id) authorizers_mapping = {x['name']: x['id'] for x in authorizers} args = [] for each in api_resources: resource_meta = api_resources[each] _LOG.info('Creating resource %s ...', each) if each.startswith('/'): resource_id = self.connection.get_resource_id(api_id, each) if resource_id: _LOG.info('Resource %s exists.', each) enable_cors = resource_meta.get('enable_cors') self._check_existing_methods( api_id=api_id, resource_id=resource_id, resource_path=each, resource_meta=resource_meta, enable_cors=enable_cors, authorizers_mapping=authorizers_mapping, api_resp=api_resp, api_integration_resp=api_integration_resp, resources_statement_singleton=resources_statement_singleton) else: args.append({ 'api_id': api_id, 'resource_path': each, 'resource_meta': resource_meta, 'authorizers_mapping': authorizers_mapping, 'resources_statement_singleton': resources_statement_singleton }) else: raise AssertionError( "API resource must starts with '/', but found %s", each) return args def describe_api_resources(self, name, meta, api_id=None): if not api_id: api = self.connection.get_api_by_name(name) if not api: return api_id = api['id'] response = self.connection.get_api(api_id) if not response: return {} response['resources'] = self.connection.get_resources(api_id) _LOG.info('Described %s API Gateway.', name) arn = 'arn:aws:apigateway:{0}::/restapis/{1}'.format(self.region, api_id) return { arn: build_description_obj(response, name, meta) } def describe_openapi(self, api_id, stage_name): response = self.connection.describe_openapi(api_id, stage_name) return json.loads(response['body'].read().decode("utf-8")) \ if isinstance(response, dict) else None def describe_tags(self, api_arn: str) -> dict | None: tag_response = self.connection.describe_tags(api_arn=api_arn) tags = tag_response.get('tags', {}) return tags def _check_existing_methods(self, api_id, resource_id, resource_path, resource_meta, enable_cors, authorizers_mapping, api_resp=None, api_integration_resp=None, resources_statement_singleton: bool = False ): """ Check if all specified methods exist and create some if not. :type api_id: str :type resource_id: str :type resource_meta: dict :type enable_cors: bool or None :type: """ methods_statement_singleton = resource_meta.get( POLICY_STATEMENT_SINGLETON) for method in resource_meta: if method == 'enable_cors': continue if self.connection.get_method(api_id, resource_id, method): _LOG.info('Method %s exists.', method) continue else: _LOG.info('Creating method %s for resource %s...', method, resource_id) self._create_method_from_metadata( api_id=api_id, resource_id=resource_id, resource_path=resource_path, method=method, method_meta=resource_meta[method], authorizers_mapping=authorizers_mapping, api_resp=api_resp, api_integration_resp=api_integration_resp, enable_cors=enable_cors, resources_statement_singleton=resources_statement_singleton, methods_statement_singleton=methods_statement_singleton ) if enable_cors and not self.connection.get_method(api_id, resource_id, 'OPTIONS'): _LOG.info('Enabling CORS for resource %s...', resource_id) self.connection.enable_cors_for_resource(api_id, resource_id) @unpack_kwargs def _create_resource_from_metadata(self, api_id, resource_path, resource_meta, authorizers_mapping, resources_statement_singleton: bool = False): self.connection.create_resource(api_id, resource_path) _LOG.info('Resource %s created.', resource_path) resource_id = self.connection.get_resource_id(api_id, resource_path) enable_cors = resource_meta.get('enable_cors') methods_statement_singleton = resource_meta.get( POLICY_STATEMENT_SINGLETON) for method in resource_meta: try: if method == 'enable_cors' or method not in SUPPORTED_METHODS: continue method_meta = resource_meta[method] _LOG.info('Creating method %s for resource %s...', method, resource_path) self._create_method_from_metadata( api_id=api_id, resource_id=resource_id, resource_path=resource_path, method=method, method_meta=method_meta, enable_cors=enable_cors, authorizers_mapping=authorizers_mapping, resources_statement_singleton=resources_statement_singleton, methods_statement_singleton=methods_statement_singleton ) except Exception as e: _LOG.error('Resource: {0}, method {1}.' .format(resource_path, method), exc_info=True) raise e _LOG.info('Method %s for resource %s created.', method, resource_path) # create enable cors only after all methods in resource created if enable_cors: self.connection.enable_cors_for_resource(api_id, resource_id) _LOG.info('CORS enabled for resource %s', resource_path) def _create_method_from_metadata( self, api_id, resource_id, resource_path, method, method_meta, authorizers_mapping, enable_cors=False, api_resp=None, api_integration_resp=None, resources_statement_singleton: bool = False, methods_statement_singleton: bool = False): resources_statement_singleton = resources_statement_singleton or False methods_statement_singleton = methods_statement_singleton or False # init responses for method resp = self.init_method_responses(api_resp, method_meta) # init integration responses for method integr_resp = self.init_integration_method_responses( api_integration_resp, method_meta) # resolve authorizer if needed authorization_type = method_meta.get('authorization_type') if authorization_type not in ['NONE', 'AWS_IAM']: # type is authorizer, so add id to meta authorizer_id = authorizers_mapping.get(authorization_type) if not authorizer_id: raise AssertionError( 'Authorizer {0} does not exist'.format(authorization_type)) method_meta['authorizer_id'] = authorizer_id authorizer = self.connection.get_authorizer( api_id, authorizer_id).get('type') if authorizer == _COGNITO_AUTHORIZER_TYPE: authorization_type = _COGNITO_AUTHORIZER_TYPE else: authorization_type = _CUSTOM_AUTHORIZER_TYPE method_request_models = method_meta.get('method_request_models') if method_request_models: (content_type, name), = method_request_models.items() model = self.connection.get_model(api_id, name) method_request_models = model if not model else method_request_models request_validator_id = self._retrieve_request_validator_id( api_id, method_meta.get('request_validator')) self.connection.create_method( api_id, resource_id, method, authorization_type=authorization_type, authorizer_id=method_meta.get('authorizer_id'), api_key_required=method_meta.get('api_key_required'), request_parameters=method_meta.get('method_request_parameters'), request_models=method_request_models, request_validator=request_validator_id) # second step: create integration integration_type = method_meta.get('integration_type') # set up integration - lambda or aws service body_template = method_meta.get('integration_request_body_template') passthrough_behavior = method_meta.get( 'integration_passthrough_behavior') request_parameters = method_meta.get('integration_request_parameters') # TODO split to map - func implementation if integration_type: if integration_type == 'lambda': lambda_name = method_meta['lambda_name'] # alias has a higher priority than version in arn resolving lambda_version = method_meta.get('lambda_version') lambda_alias = method_meta.get('lambda_alias') lambda_arn = self.lambda_res. \ resolve_lambda_arn_by_version_and_alias(lambda_name, lambda_version, lambda_alias) enable_proxy = method_meta.get('enable_proxy') cache_configuration = method_meta.get('cache_configuration') cache_key_parameters = cache_configuration.get( 'cache_key_parameters') if cache_configuration else None self.connection.create_lambda_integration( lambda_arn, api_id, resource_id, method, body_template, passthrough_behavior, method_meta.get('lambda_region'), enable_proxy=enable_proxy, cache_key_parameters=cache_key_parameters, request_parameters=request_parameters) # add permissions to invoke # Allows to apply method or resource singleton of a policy # statement, setting wildcard on the respective scope. api_source_arn = f"arn:aws:execute-api:{self.region}:" \ f"{self.account_id}:{api_id}/*" \ "/{method}/{path}" _method, _path = method, resource_path.lstrip('/') if resources_statement_singleton: _path = '*' if methods_statement_singleton: _method = '*' api_source_arn = api_source_arn.format( method=_method, path=_path ) _id = f'{lambda_arn}-{api_source_arn}' statement_id = md5(_id.encode('utf-8')).hexdigest() response: dict = self.lambda_res.add_invocation_permission( name=lambda_arn, principal='apigateway.amazonaws.com', source_arn=api_source_arn, statement_id=statement_id, exists_ok=True ) if response is None: message = f'Permission: \'{statement_id}\' attached to ' \ f'\'{lambda_arn}\' lambda to allow ' \ f'lambda:InvokeFunction for ' \ f'apigateway.amazonaws.com principal from ' \ f'\'{api_source_arn}\' SourceArn already exists.' _LOG.warning(message + ' Skipping.') elif integration_type == 'service': uri = method_meta.get('uri') role = method_meta.get('role') integration_method = method_meta.get('integration_method') self.connection.create_service_integration(self.account_id, api_id, resource_id, method, integration_method, role, uri, body_template, passthrough_behavior, request_parameters) elif integration_type == 'mock': self.connection.create_mock_integration(api_id, resource_id, method, body_template, passthrough_behavior) elif integration_type == 'http': integration_method = method_meta.get('integration_method') uri = method_meta.get('uri') enable_proxy = method_meta.get('enable_proxy') self.connection.create_http_integration(api_id, resource_id, method, integration_method, uri, body_template, passthrough_behavior, enable_proxy) else: raise AssertionError('%s integration type does not exist.', integration_type) # third step: setup method responses if resp: for response in resp: self.connection.create_method_response( api_id, resource_id, method, response.get('status_code'), response.get('response_parameters'), response.get('response_models'), enable_cors) else: self.connection.create_method_response( api_id, resource_id, method, enable_cors=enable_cors) # fourth step: setup integration responses if integr_resp: for each in integr_resp: self.connection.create_integration_response( api_id, resource_id, method, each.get('status_code'), each.get('error_regex'), each.get('response_parameters'), each.get('response_templates'), enable_cors) else: self.connection.create_integration_response( api_id, resource_id, method, enable_cors=enable_cors) @staticmethod def init_method_responses(api_resp, method_meta): method_responses = method_meta.get("responses") if method_responses: resp = method_responses elif api_resp: resp = api_resp else: resp = [] return resp @staticmethod def init_integration_method_responses(api_integration_resp, method_meta): integration_method_responses = method_meta.get("integration_responses") if integration_method_responses: integration_resp = integration_method_responses elif api_integration_resp: integration_resp = api_integration_resp else: integration_resp = [] return integration_resp def _customize_gateway_responses(self, api_id): responses = self.connection.get_gateway_responses(api_id) response_types = [r['responseType'] for r in responses] for response_type in response_types: time.sleep(10) self.connection.add_header_to_gateway_response(api_id, response_type, _CORS_HEADER_NAME, _CORS_HEADER_VALUE) @staticmethod def _get_lambdas_invoked_by_api_gw(resources_meta, retrieve_aliases=False): lambdas = set() for resource, meta in resources_meta.items(): for method, description in meta.items(): if method in SUPPORTED_METHODS: lambda_ = description.get('lambda_name') if lambda_: if retrieve_aliases: lambda_ = (lambda_, description.get('lambda_alias')) lambdas.add(lambda_) return list(lambdas) def remove_api_gateways(self, args): return self.create_pool(self._remove_api_gateway, args) def _remove_invocation_permissions_from_lambdas(self, config): api_id = config['description']['id'] _LOG.info(fr'Removing invocation permissions for api {api_id}') lambdas_aliases = self._get_lambdas_invoked_by_api_gw( config['resource_meta'].get('resources', {}), retrieve_aliases=True) for lambda_, alias in lambdas_aliases: _LOG.info(f'Removing invocation permissions for api {api_id} ' f'from lambda {lambda_} and alias {alias}') statements = self.lambda_res.get_invocation_permission( lambda_name=self.lambda_res.build_lambda_arn(lambda_), qualifier=alias ).get('Statement', []) ids_to_remove = [st.get('Sid') for st in statements if api_id in st.get('Condition', {}).get( 'ArnLike', {}).get('AWS:SourceArn', '')] self.lambda_res.remove_invocation_permissions( lambda_name=lambda_, qualifier=alias, ids_to_remove=ids_to_remove ) @unpack_kwargs def _remove_api_gateway(self, arn, config): api_id = config['description']['id'] stage_name = config["resource_meta"]["deploy_stage"] openapi_context = self.describe_openapi(api_id, stage_name) if openapi_context: api_lambdas_arns = self.extract_api_gateway_lambdas_arns( openapi_context) api_lambda_auth_arns = self.extract_api_gateway_lambda_auth_arns( openapi_context) self.remove_lambdas_permissions( api_id, {*api_lambdas_arns, *api_lambda_auth_arns} ) try: self.connection.remove_api(api_id, log_not_found_error=False) group_names = self.cw_logs_conn.get_log_group_names() for each in group_names: if each.split('/')[0].endswith(api_id): self.cw_logs_conn.delete_log_group_name(each) _LOG.info(f'API Gateway {api_id} was removed.') return {arn: config} except ClientError as e: if e.response['Error']['Code'] == 'NotFoundException': _LOG.warning(f'API Gateway {api_id} is not found') return {arn: config} else: raise e @unpack_kwargs def _create_model_from_metadata(self, api_id, models): _LOG.info('Going to process API Gateway models') for name, model_data in models.items(): description = model_data.get('description') schema = model_data.get('schema') if isinstance(schema, dict): schema = json.dumps(schema) content_type = model_data.get('content_type') self.connection.create_model( api_id, name, content_type, description, schema) def build_web_socket_api_gateway_arn(self, api_id: str) -> str: return f'arn:aws:execute-api:{self.apigw_v2.client.meta.region_name}' \ f':{self.account_id}:{api_id}/*' def create_web_socket_api_gateway(self, args): return self.create_pool(self._create_web_socket_api_from_meta, args, 1) @unpack_kwargs def _create_web_socket_api_from_meta(self, name: str, meta: dict): stage_name = meta.get('deploy_stage') resources = meta.get('resources') or {} route_selection_expression = meta.get('route_selection_expression') api_gw_describe = self.describe_v2_api_gateway(name, meta) if api_gw_describe: _LOG.info(f'Api gateway with name \'{name}\' exists. Returning') return api_gw_describe api_id = self.apigw_v2.create_web_socket_api( name=name, route_selection_expression=route_selection_expression, tags=meta.get('tags')) for route_name, route_meta in resources.items(): int_type = route_meta.get('integration_type') or 'lambda' if int_type != 'lambda': _LOG.error(f'Integration type: {int_type} currently ' f'not supported. Skipping..') continue lambda_name = route_meta['lambda_name'] lambda_version = route_meta.get('lambda_version') lambda_alias = route_meta.get('lambda_alias') lambda_arn = self.lambda_res.resolve_lambda_arn_by_version_and_alias( lambda_name, lambda_version, lambda_alias) integration_id = self.apigw_v2.create_lambda_integration( api_id=api_id, lambda_arn=lambda_arn, enable_proxy=route_meta.get('enable_proxy') or False ) self.apigw_v2.put_route_integration( api_id=api_id, route_name=route_name, integration_id=integration_id ) source_arn = f'{self.build_web_socket_api_gateway_arn(api_id)}/{route_name}' self.lambda_res.add_invocation_permission( name=lambda_arn, principal='apigateway.amazonaws.com', source_arn=source_arn, statement_id=f'{name}-{route_name.strip("$")}-invoke', exists_ok=True ) self.apigw_v2.create_stage(api_id=api_id, stage_name=stage_name) return self.describe_v2_api_gateway( name=name, meta=meta, api_id=api_id ) def describe_v2_api_gateway(self, name, meta, api_id=None): if not api_id: api = self.apigw_v2.get_api_by_name(name) if not api: return {} api_id = api['ApiId'] # response = self.connection.get_api(api_id) response = {'ApiId': api_id} # maybe the arn is not valid - I didn't manage to find a valid # example browsing for 5 minutes, so the hell with it. Currently, # nothing depends on it arn = self.build_web_socket_api_gateway_arn(api_id) return { arn: build_description_obj(response, name, meta) } def remove_v2_api_gateway(self, args): return self.create_pool(self._remove_v2_api_gateway, args) @unpack_kwargs def _remove_v2_api_gateway(self, arn, config): api_id = config.get('description', {}).get('ApiId') if not api_id: _LOG.warning('V2 api id not found in output. Skipping') return {arn: config} lambda_arns = [] routes = self.apigw_v2.get_routes(api_id) for route in routes['Items']: lambda_arns.append(self.apigw_v2.get_integration( api_id, route['Target'].replace('integrations/', ''))) self.remove_lambdas_permissions( api_id, {*[arn for arn in lambda_arns if arn is not None]}) self.apigw_v2.delete_api(api_id) return {arn: config} @staticmethod def extract_api_gateway_lambdas_arns(openapi_spec): api_gateway_lambdas_arns = {*()} for path, path_item in openapi_spec.get('paths', {}).items(): for method, method_data in path_item.items(): integration = method_data.get('x-amazon-apigateway-integration') if not integration or not integration.get('uri'): continue uri = integration.get('uri') try: lambda_arn = uri.split('/functions/')[1].split('/')[0] except IndexError: _LOG.warning(f"Invalid lambda arn in integration uri {uri}") continue api_gateway_lambdas_arns.add(lambda_arn) return api_gateway_lambdas_arns @staticmethod def extract_api_gateway_lambda_auth_arns(openapi_spec): api_gateway_lambdas_arns = {*()} security_schemas = deep_get(openapi_spec, SECURITY_SCHEMAS_DEEP_KEY, {}) for schema_data in security_schemas.values(): authorizer = schema_data.get("x-amazon-apigateway-authorizer") if not authorizer or not authorizer.get("authorizerUri"): continue uri = authorizer.get("authorizerUri") try: lambda_arn = uri.split('/functions/')[1].split('/')[0] except IndexError: _LOG.warning(f"Invalid lambda arn in authorizer uri {uri}") continue api_gateway_lambdas_arns.add(lambda_arn) return api_gateway_lambdas_arns