syndicate/core/resources/lambda_resource.py (1,223 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import json
import time
from pathlib import PurePath
from typing import Optional
from botocore.exceptions import ClientError
from syndicate.commons.log_helper import get_logger, get_user_logger
from syndicate.connection.helper import retry
from syndicate.core.build.bundle_processor import _build_output_key
from syndicate.core.build.meta_processor import S3_PATH_NAME
from syndicate.core.constants import DEFAULT_LOGS_EXPIRATION
from syndicate.core.decorators import threading_lock
from syndicate.core.helper import unpack_kwargs, is_zip_empty
from syndicate.core.resources.base_resource import BaseResource
from syndicate.core.resources.helper import (
build_description_obj, validate_params, assert_required_params, if_updated)
LAMBDA_LAYER_REQUIRED_PARAMS = ['runtimes', 'deployment_package']
DYNAMODB_TRIGGER_REQUIRED_PARAMS = ['target_table', 'batch_size']
CLOUD_WATCH_TRIGGER_REQUIRED_PARAMS = ['target_rule']
S3_TRIGGER_REQUIRED_PARAMS = ['target_bucket', 's3_events']
SQS_TRIGGER_REQUIRED_PARAMS = ['target_queue', 'batch_size']
SNS_TRIGGER_REQUIRED_PARAMS = ['target_topic']
KINESIS_TRIGGER_REQUIRED_PARAMS = ['target_stream', 'batch_size',
'starting_position']
PROVISIONED_CONCURRENCY = 'provisioned_concurrency'
_LOG = get_logger(__name__)
USER_LOG = get_user_logger()
LAMBDA_MAX_CONCURRENCY = 'max_concurrency'
LAMBDA_CONCUR_QUALIFIER_ALIAS = 'ALIAS'
LAMBDA_CONCUR_QUALIFIER_VERSION = 'VERSION'
_LAMBDA_PROV_CONCURRENCY_QUALIFIERS = [LAMBDA_CONCUR_QUALIFIER_ALIAS,
LAMBDA_CONCUR_QUALIFIER_VERSION]
SNAP_START = 'snap_start'
_APPLY_SNAP_START_VERSIONS = 'PublishedVersions'
_APPLY_SNAP_START_NONE = 'None'
_SNAP_START_CONFIGURATIONS = [_APPLY_SNAP_START_VERSIONS,
_APPLY_SNAP_START_NONE]
DYNAMO_DB_TRIGGER = 'dynamodb_trigger'
CLOUD_WATCH_RULE_TRIGGER = 'cloudwatch_rule_trigger'
EVENT_BRIDGE_RULE_TRIGGER = 'eventbridge_rule_trigger'
S3_TRIGGER = 's3_trigger'
SNS_TOPIC_TRIGGER = 'sns_topic_trigger'
KINESIS_TRIGGER = 'kinesis_trigger'
SQS_TRIGGER = 'sqs_trigger'
NOT_AVAILABLE = 'N/A'
_DOTNET_LAMBDA_SHARED_STORE_ENV = {
'DOTNET_SHARED_STORE': '/opt/dotnetcore/store/'
}
class LambdaResource(BaseResource):
def __init__(self, lambda_conn, s3_conn, cw_logs_conn, sns_res, sns_conn,
iam_conn, dynamodb_conn, sqs_conn, kinesis_conn,
cw_events_conn, cognito_idp_conn, region, account_id,
deploy_target_bucket) -> None:
self.lambda_conn = lambda_conn
self.s3_conn = s3_conn
self.cw_logs_conn = cw_logs_conn
self.sns_res = sns_res
self.sns_conn = sns_conn
self.iam_conn = iam_conn
self.dynamodb_conn = dynamodb_conn
self.sqs_conn = sqs_conn
self.kinesis_conn = kinesis_conn
self.cw_events_conn = cw_events_conn
self.cognito_idp_conn = cognito_idp_conn
self.region = region
self.account_id = account_id
self.deploy_target_bucket = deploy_target_bucket
self.dynamic_params_resolvers = {
('cognito_idp', 'id'):
self.cognito_idp_conn.if_pool_exists_by_name,
('cognito_idp', 'client_id'):
self.cognito_idp_conn.if_cup_client_exist
}
def qualifier_alias_resolver(self, lambda_def):
return lambda_def['Alias']
def get_existing_permissions(self, lambda_arn):
return self.lambda_conn.get_existing_permissions(lambda_arn)
def remove_permissions(self, lambda_arn, permissions_sids):
return self.lambda_conn.remove_permissions(lambda_arn,
permissions_sids)
def remove_permissions_by_resource_name(self, lambda_name, resource_name,
all_permissions=True):
""" Remove permissions to invoke lambda by resource name
:param lambda_name: lambda name, arn or full arn
:param resource_name: resource name, arn or full arn
:param all_permissions: whether to delete all permissions for the
specified resource or the first one only
"""
lambda_permissions = self.get_existing_permissions(lambda_name)
for statement in lambda_permissions:
try:
source_arn = statement['Condition']['ArnLike']['AWS:SourceArn']
except KeyError:
continue
if resource_name in source_arn:
self.lambda_conn.remove_one_permission(
function_name=lambda_name,
statement_id=statement['Sid'])
if not all_permissions:
break
def qualifier_version_resolver(self, lambda_def):
latest_version_number = lambda_def['Configuration']['Version']
if 'LATEST' in latest_version_number:
all_versions = self.lambda_conn.list_function_versions(
name=lambda_def['Configuration']['FunctionName'])
bare_version_arns = [version.get('Version') for version in
all_versions]
bare_version_arns.sort()
latest_version_number = bare_version_arns[-1]
return latest_version_number
_LAMBDA_QUALIFIER_RESOLVER = {
LAMBDA_CONCUR_QUALIFIER_ALIAS: qualifier_alias_resolver,
LAMBDA_CONCUR_QUALIFIER_VERSION: qualifier_version_resolver
}
def remove_concurrency_for_function(self, kwargs):
assert_required_params(
all_params=kwargs,
required_params_names=['name'])
self.lambda_conn.delete_function_concurrency_config(
name=kwargs['name'])
def setup_lambda_concur_alias_version(self, **kwargs):
assert_required_params(
all_params=kwargs,
required_params_names=['name',
'qualifier',
'provisioned_level',
'type'])
name = kwargs.get('name')
concur_type = kwargs.get('type')
qualifier = kwargs.get('qualifier')
provisioned_level = kwargs.get('provisioned_level')
resp = self.lambda_conn.configure_provisioned_concurrency(
name=name,
qualifier=qualifier,
concurrent_executions=provisioned_level)
_LOG.info(
f'Lambda {name} concurrency configuration status '
f'of type {concur_type}:{qualifier}: {resp.get("Status")}')
def setup_lambda_concur_function(self, **kwargs):
assert_required_params(
all_params=kwargs,
required_params_names=['name',
'provisioned_level',
'type'])
name = kwargs.get('name')
provisioned_level = kwargs.get('provisioned_level')
concur_type = kwargs.get('type')
resp = self.lambda_conn.put_function_concurrency(
function_name=name,
concurrent_executions=provisioned_level)
_LOG.info(
f'Lambda {name} concurrency configuration of type {concur_type}:'
f'{resp.get("ReservedConcurrentExecutions")}')
def create_lambda(self, args):
""" Create lambdas in pool in sub processes.
:type args: list
"""
return self.create_pool(self._create_lambda_from_meta, args)
def update_lambda(self, args):
return self.create_pool(self._update_lambda, args)
def update_lambda_layer(self, args):
return self.create_pool(self.create_lambda_layer_from_meta, args)
def describe_lambda(self, name, meta, response=None):
if not response:
response = self.lambda_conn.get_function(lambda_name=name)
if not response:
return {}
arn = self.build_lambda_arn_with_alias(response, meta.get('alias'))
del response['Configuration']['FunctionArn']
return {
arn: build_description_obj(response, name, meta)
}
def build_lambda_arn(self, name):
arn = 'arn:aws:lambda:{0}:{1}:function:{2}'.format(self.region,
self.account_id,
name)
return arn
def resolve_lambda_arn_by_version_and_alias(self, name, version, alias):
if version or alias:
lambda_response = self.lambda_conn.get_function(name, version)
return self.build_lambda_arn_with_alias(lambda_response, alias)
else:
return self.lambda_conn.get_function(name)['Configuration'][
'FunctionArn']
def add_invocation_permission(self, name, principal, source_arn=None,
statement_id=None, exists_ok=False):
return self.lambda_conn.add_invocation_permission(
name=name,
principal=principal,
source_arn=source_arn,
statement_id=statement_id,
exists_ok=exists_ok
)
def get_invocation_permission(self, lambda_name, qualifier):
policies = self.lambda_conn.get_policy(lambda_name=lambda_name,
qualifier=qualifier)
if not policies:
_LOG.warning(f'No invocation permissions were found in '
f'lambda: {lambda_name} with qualifier: {qualifier}')
return {}
return json.loads(policies['Policy'])
def remove_invocation_permissions(self, lambda_name, qualifier,
ids_to_remove=None):
self.lambda_conn.remove_invocation_permission(
func_name=lambda_name, qualifier=qualifier,
ids_to_remove=ids_to_remove)
def build_lambda_arn_with_alias(self, response, alias=None):
name = response['Configuration']['FunctionName']
l_arn = self.build_lambda_arn(name=name)
version = response['Configuration']['Version']
arn = '{0}:{1}'.format(l_arn, alias or version)
return arn
def _setup_function_concurrency(self, name, meta):
con_exec = meta.get(LAMBDA_MAX_CONCURRENCY)
if con_exec:
_LOG.debug('Going to set up concurrency executions')
if self.check_concurrency_availability(con_exec):
self.lambda_conn.put_function_concurrency(
function_name=name,
concurrent_executions=con_exec)
_LOG.info(
f'Concurrency limit for lambda {name} '
f'is set to {con_exec}')
def check_concurrency_availability(self, requested_concurrency):
if not (isinstance(requested_concurrency, int)
and requested_concurrency >= 0):
_LOG.warn('The number of reserved concurrent executions '
'must be a non-negative integer.')
return False
unresolved_exec = \
self.lambda_conn.get_unresolved_concurrent_executions()
if requested_concurrency <= unresolved_exec:
return True
else:
_LOG.warn(
f'Account does not have such unresolved executions.'
f' Current un - {unresolved_exec}')
return False
@unpack_kwargs
@retry()
def _create_lambda_from_meta(self, name, meta):
from syndicate.core import CONFIG
_LOG.debug('Creating lambda %s', name)
req_params = ['iam_role_name', 'runtime', 'memory', 'timeout',
'func_name']
# Lambda configuration
validate_params(name, meta, req_params)
key = meta[S3_PATH_NAME]
key_compound = PurePath(CONFIG.deploy_target_bucket_key_compound,
key).as_posix()
if not self.s3_conn.is_file_exists(self.deploy_target_bucket,
key_compound):
raise AssertionError(f'Error while creating lambda: {name};'
f'Deployment package {key_compound} does not exist '
f'in {self.deploy_target_bucket} bucket')
lambda_def = self.lambda_conn.get_function(name)
if lambda_def:
_LOG.warn('%s lambda exists.', name)
return self.describe_lambda(name, meta, lambda_def)
role_name = meta['iam_role_name']
role_arn = self.iam_conn.check_if_role_exists(role_name)
if not role_arn:
raise AssertionError(f'Role {role_name} does not exist; '
f'Lambda {name} failed to be configured.')
dl_target_arn = self.get_dl_target_arn(meta=meta,
region=self.region,
account_id=self.account_id)
publish_version = meta.get('publish_version', False)
lambda_layers_arns = []
layer_meta = meta.get('layers')
if layer_meta:
if 'dotnet' in meta['runtime'].lower():
env_vars = meta.get('env_variables', {})
env_vars.update(_DOTNET_LAMBDA_SHARED_STORE_ENV)
meta['env_variables'] = env_vars
for layer_name in layer_meta:
layer_arn = self.lambda_conn.get_lambda_layer_arn(layer_name)
if not layer_arn:
raise AssertionError(
'Could not link lambda layer {} to lambda {} '
'due to layer absence!'.format(layer_name, name))
lambda_layers_arns.append(layer_arn)
ephemeral_storage = meta.get('ephemeral_storage', 512)
if meta.get('env_variables'):
self._resolve_env_variables(meta.get('env_variables'))
self.lambda_conn.create_lambda(
lambda_name=name,
func_name=meta['func_name'],
role=role_arn,
runtime=meta['runtime'].lower(),
memory=meta['memory'],
timeout=meta['timeout'],
s3_bucket=self.deploy_target_bucket,
s3_key=key_compound,
env_vars=meta.get('env_variables'),
vpc_sub_nets=meta.get('subnet_ids'),
vpc_security_group=meta.get('security_group_ids'),
dl_target_arn=dl_target_arn,
tracing_mode=meta.get('tracing_mode'),
publish_version=publish_version,
layers=lambda_layers_arns,
ephemeral_storage=ephemeral_storage,
snap_start=self._resolve_snap_start(meta=meta),
architectures=meta.get('architectures'),
tags=meta.get('tags')
)
_LOG.debug('Lambda created %s', name)
# AWS sometimes returns None after function creation, needs for
# stability
waiter = self.lambda_conn.get_waiter('function_exists')
waiter.wait(FunctionName=name)
self._resolve_log_group(lambda_name=name, meta=meta)
lambda_def = self.__describe_lambda_by_version(
name) if publish_version else self.lambda_conn.get_function(name)
version = lambda_def['Configuration']['Version']
self._setup_function_concurrency(name=name, meta=meta)
# enabling aliases,
# aliases can be enabled only and for $LATEST
alias = meta.get('alias')
if alias:
_LOG.debug('Creating alias')
_LOG.debug(self.lambda_conn.create_alias(function_name=name,
name=alias,
version=version))
url_config = meta.get('url_config')
if url_config:
_LOG.info('Url config is found. Setting the function url')
url = self.lambda_conn.set_url_config(
function_name=name, auth_type=url_config.get('auth_type'),
qualifier=alias, cors=url_config.get('cors'),
principal=url_config.get('principal'),
source_arn=url_config.get('source_arn')
)
USER_LOG.info(f'{name}:{alias if alias else ""} URL: {url}')
arn = self.build_lambda_arn_with_alias(lambda_def, alias) \
if publish_version or alias else \
lambda_def['Configuration']['FunctionArn']
_LOG.debug(f'Resolved lambda arn: {arn}')
event_sources_meta = meta.get('event_sources', [])
self.create_lambda_triggers(name, arn, role_name, event_sources_meta)
if meta.get('max_retries') is not None:
_LOG.debug('Setting lambda event invoke config')
function_name = (name + ":" + alias) if alias else name
invoke_config = self.lambda_conn.put_function_event_invoke_config(
function_name=function_name,
max_retries=meta.get('max_retries')
)
_LOG.debug(f'Created lambda invoke config: {invoke_config}')
# concurrency configuration
self._manage_provisioned_concurrency_configuration(
function_name=name,
meta=meta,
lambda_def=lambda_def)
return self.describe_lambda(name, meta, lambda_def)
@staticmethod
def get_dl_target_arn(meta, region, account_id):
dl_type = meta.get('dl_resource_type')
if dl_type:
dl_type = dl_type.lower()
dl_name = meta.get('dl_resource_name')
dl_target_arn = 'arn:aws:{0}:{1}:{2}:{3}'.format(
dl_type,
region,
account_id,
dl_name) if dl_type and dl_name else None
return dl_target_arn
@unpack_kwargs
def _update_lambda(self, name, meta, context):
from syndicate.core import CONFIG
_LOG.info('Updating lambda: {0}'.format(name))
req_params = ['runtime', 'memory', 'timeout', 'func_name']
validate_params(name, meta, req_params)
key = meta[S3_PATH_NAME]
key_compound = PurePath(CONFIG.deploy_target_bucket_key_compound,
key).as_posix()
if not self.s3_conn.is_file_exists(self.deploy_target_bucket,
key_compound):
raise AssertionError(
'Deployment package {0} does not exist '
'in {1} bucket'.format(key_compound,
self.deploy_target_bucket))
response = self.lambda_conn.get_function(name)
if not response:
raise AssertionError('{0} lambda does not exist.'.format(name))
old_conf = response['Configuration']
publish_version = meta.get('publish_version', False)
self.lambda_conn.update_code_source(
lambda_name=name,
s3_bucket=self.deploy_target_bucket,
s3_key=key_compound,
publish_version=publish_version)
role_name = meta['iam_role_name']
role_arn = self.iam_conn.check_if_role_exists(role_name)
if not role_arn:
_LOG.warning('Execution role does not exist. Keeping the old one')
role_arn = if_updated(role_arn, old_conf.get('Role'))
handler = if_updated(meta.get('func_name'), old_conf.get('Handler'))
env_vars = meta.get('env_variables', {})
timeout = if_updated(meta.get('timeout'), old_conf.get('Timeout'))
memory_size = if_updated(meta.get('memory_size'),
old_conf.get('MemorySize'))
old_subnets, old_security_groups, _ = self.lambda_conn.retrieve_vpc_config(
old_conf)
vpc_subnets = if_updated(set(meta.get('subnet_ids') or []),
old_subnets)
vpc_security_group = if_updated(
set(meta.get('security_group_ids') or []), old_security_groups)
runtime = if_updated(meta.get('runtime'), old_conf.get('Runtime'))
ephemeral_storage = if_updated(
meta.get('ephemeral_storage'),
self.lambda_conn.retrieve_ephemeral_storage(old_conf))
dl_type = meta.get('dl_resource_type')
if dl_type:
dl_type = dl_type.lower()
dl_name = meta.get('dl_resource_name')
dl_target_arn = 'arn:aws:{0}:{1}:{2}:{3}'.format(
dl_type,
self.region,
self.account_id,
dl_name) if dl_type and dl_name else None
lambda_layers_arns = []
layer_meta = meta.get('layers')
if layer_meta:
if 'dotnet' in meta['runtime'].lower():
env_vars.update(_DOTNET_LAMBDA_SHARED_STORE_ENV)
meta['env_variables'] = env_vars
for layer_name in layer_meta:
layer_arn = self.lambda_conn.get_lambda_layer_arn(layer_name)
if not layer_arn:
raise AssertionError(
'Could not link lambda layer {} to lambda {} '
'due to layer absence!'.format(layer_name, name))
lambda_layers_arns.append(layer_arn)
if env_vars:
self._resolve_env_variables(env_vars)
_LOG.info(f'Updating lambda {name} configuration')
self.lambda_conn.update_lambda_configuration(
lambda_name=name, role=role_arn, handler=handler,
env_vars=env_vars,
timeout=timeout, memory_size=memory_size, runtime=runtime,
vpc_sub_nets=vpc_subnets, vpc_security_group=vpc_security_group,
dead_letter_arn=dl_target_arn, layers=lambda_layers_arns,
ephemeral_storage=ephemeral_storage,
snap_start=self._resolve_snap_start(meta=meta)
)
_LOG.info(f'Lambda configuration has been updated')
# It seems to me that the waiter is not necessary here, the method
# lambda_conn.update_lambda_configuration is the one that actually
# waits. But still it does not make it worse :)
_LOG.info(f'Initializing function updated waiter for {name}')
waiter = self.lambda_conn.get_waiter('function_updated_v2')
waiter.wait(FunctionName=name)
_LOG.info(f'Waiting has finished')
self._resolve_log_group(lambda_name=name, meta=meta)
response = self.lambda_conn.get_function(name)
_LOG.info(f'Lambda describe result: {response}')
code_sha_256 = response['Configuration']['CodeSha256']
publish_ver_response = self.lambda_conn.publish_version(
function_name=name,
code_sha_256=code_sha_256)
updated_version = publish_ver_response['Version']
_LOG.info(
f'Version {updated_version} for lambda {name} published')
alias_name = meta.get('alias')
aliases = list(
self.lambda_conn.get_aliases(function_name=name).keys()
)
if alias_name:
if alias_name in aliases:
self.lambda_conn.update_alias(
function_name=name,
alias_name=alias_name,
function_version=updated_version)
_LOG.info(
f'Alias {alias_name} has been updated for lambda {name}')
else:
self.lambda_conn.create_alias(
function_name=name,
name=alias_name,
version=updated_version)
_LOG.info(
f'Alias {alias_name} has been created for lambda {name}')
for alias in aliases:
if alias != alias_name:
self.lambda_conn.delete_alias(
function_name=name,
name=alias
)
aliases.remove(alias)
url_config = meta.get('url_config')
if url_config:
_LOG.info('URL config is found. Setting the function URL')
url = self.lambda_conn.set_url_config(
function_name=name, auth_type=url_config.get('auth_type'),
qualifier=alias_name, cors=url_config.get('cors'),
principal=url_config.get('principal'),
source_arn=url_config.get('source_arn')
)
print(f'{name}:{alias_name if alias_name else ""}: {url}')
else:
existing_url = self.lambda_conn.get_url_config(
function_name=name, qualifier=alias_name)
if existing_url:
_LOG.info('Going to delete existing URL config that is not '
'described in the lambda_config file')
self.lambda_conn.delete_url_config(
function_name=name, qualifier=alias_name)
arn = self.build_lambda_arn_with_alias(response, alias_name) \
if publish_version or alias_name else \
response['Configuration']['FunctionArn']
_LOG.debug(f'Resolved lambda arn: {arn}')
event_sources_meta = meta.get('event_sources', [])
self.update_lambda_triggers(name, arn, role_name, event_sources_meta)
if meta.get('max_retries') is not None:
_LOG.debug('Updating lambda event invoke config')
function_name = (name + ":" + alias_name) if alias_name else name
try:
_LOG.debug('Updating lambda event invoke config')
invoke_config = \
self.lambda_conn.update_function_event_invoke_config(
function_name=function_name,
max_retries=meta.get('max_retries')
)
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
_LOG.debug('Lambda event invoke config is absent. '
'Creating a new one')
invoke_config = \
self.lambda_conn.put_function_event_invoke_config(
function_name=function_name,
max_retries=meta.get('max_retries')
)
else:
raise e
_LOG.debug(invoke_config)
req_max_concurrency = meta.get(LAMBDA_MAX_CONCURRENCY)
existing_max_concurrency = self.lambda_conn. \
describe_function_concurrency(name=name)
if req_max_concurrency and existing_max_concurrency:
if existing_max_concurrency != req_max_concurrency:
self._set_function_concurrency(name=name, meta=meta)
elif not req_max_concurrency and existing_max_concurrency:
self.lambda_conn.delete_function_concurrency_config(name=name)
elif req_max_concurrency and not existing_max_concurrency:
self._set_function_concurrency(name=name, meta=meta)
self._manage_provisioned_concurrency_configuration(function_name=name,
meta=meta,
lambda_def=context)
_LOG.info(f'Updating has finished for lambda {name}')
return self.describe_lambda(name, meta, response)
def _set_function_concurrency(self, name, meta):
provisioned = self.lambda_conn. \
describe_provisioned_concurrency_configs(name=name)
if provisioned:
self._delete_lambda_prov_concur_config(
function_name=name,
existing_config=provisioned)
self._setup_function_concurrency(name=name, meta=meta)
def _manage_provisioned_concurrency_configuration(self, function_name,
meta,
lambda_def=None):
existing_configs = self.lambda_conn. \
describe_provisioned_concurrency_configs(name=function_name)
concurrency = meta.get(PROVISIONED_CONCURRENCY)
if not existing_configs and not concurrency:
# no existing config, no config in meta -> nothing to do
return
if existing_configs and concurrency:
# todo check if required config already present
if not lambda_def:
lambda_def = self.lambda_conn.get_function(
lambda_name=function_name)
self._delete_lambda_prov_concur_config(
function_name=function_name,
existing_config=existing_configs)
self._create_lambda_prov_concur_config(
function_name=function_name,
meta=meta,
concurrency=concurrency,
lambda_def=lambda_def)
return
if not existing_configs and concurrency:
# no existing but expected one - create
self._create_lambda_prov_concur_config(
function_name=function_name,
meta=meta,
concurrency=concurrency,
lambda_def=lambda_def)
return
if existing_configs and not concurrency:
# to delete existing one
self._delete_lambda_prov_concur_config(
function_name=function_name,
existing_config=existing_configs)
return
def _delete_lambda_prov_concur_config(self, function_name,
existing_config):
if not existing_config:
return
for config in existing_config:
qualifier = self._resolve_configured_existing_qualifier(config)
self.lambda_conn.delete_provisioned_concurrency_config(
name=function_name,
qualifier=qualifier)
_LOG.info(
f'Existing provisioned concurrency configuration '
f'set up on qualifier {qualifier} '
f'was removed from lambda {function_name}')
def _create_lambda_prov_concur_config(self, function_name, meta,
concurrency,
lambda_def=None):
if not lambda_def:
lambda_def = self.lambda_conn.get_function(
lambda_name=function_name)
qualifier = concurrency.get('qualifier')
if not qualifier:
raise AssertionError('Parameter `qualifier` is required for '
'concurrency configuration but it is absent')
if qualifier not in _LAMBDA_PROV_CONCURRENCY_QUALIFIERS:
raise AssertionError(
f'Parameter `qualifier` must be one of '
f'{_LAMBDA_PROV_CONCURRENCY_QUALIFIERS}, but it is equal '
f'to ${qualifier}')
resolved_qualifier = self._resolve_requested_qualifier(lambda_def,
meta,
qualifier)
requested_provisioned_level = concurrency.get('value')
if not requested_provisioned_level:
raise AssertionError('Parameter `provisioned_level` is required '
'for concurrency configuration but '
'it is absent')
max_prov_limit = self.lambda_conn.describe_function_concurrency(
name=function_name)
if not max_prov_limit:
max_prov_limit = self.lambda_conn. \
get_unresolved_concurrent_executions()
if requested_provisioned_level > max_prov_limit:
raise AssertionError(f'Requested provisioned concurrency for '
f'lambda {function_name} must not be greater '
f'than function concurrency limit if any or '
f'account unreserved concurrency. '
f'Max is set to {max_prov_limit}; '
f'Requested: {requested_provisioned_level}')
self.lambda_conn.configure_provisioned_concurrency(
name=function_name,
qualifier=resolved_qualifier,
concurrent_executions=requested_provisioned_level)
_LOG.info(f'Provisioned concurrency has been configured for lambda '
f'{function_name} of type {qualifier}, '
f'value {requested_provisioned_level}')
def _resolve_requested_qualifier(self, lambda_def, meta, qualifier):
if not qualifier:
raise AssertionError('Parameter `qualifier` is required for '
'concurrency configuration but it is absent')
if qualifier not in _LAMBDA_PROV_CONCURRENCY_QUALIFIERS:
raise AssertionError(f'Parameter `qualifier` must be one of '
f'{_LAMBDA_PROV_CONCURRENCY_QUALIFIERS}, but it is equal '
f'to ${qualifier}')
lambda_def['Alias'] = meta.get('alias')
resolve_qualifier_req = lambda_def
resolved_qualifier = self._LAMBDA_QUALIFIER_RESOLVER[qualifier](
self, resolve_qualifier_req)
return resolved_qualifier
@staticmethod
def _resolve_configured_existing_qualifier(existing_config):
function_arn = existing_config.get('FunctionArn')
parts = function_arn.split(':')
return parts[-1]
def _is_equal_lambda_layer(self, new_layer_sha, old_layer_name):
import base64
versions = self.lambda_conn.list_lambda_layer_versions(
name=old_layer_name)
for version in versions:
old_layer = self.lambda_conn.get_lambda_layer_by_arn(
version['LayerVersionArn'])
if new_layer_sha == base64.b64decode(
old_layer['Content']['CodeSha256']):
return old_layer
def __describe_lambda_by_version(self, name):
versions = self.lambda_conn.versions_list(name)
# find the last created version
version = max(
[int(i['Version']) if i['Version'] != '$LATEST' else 0 for i in
versions])
if version != 0:
return self.lambda_conn.get_function(name, str(version))
else:
return self.lambda_conn.get_function(name)
@retry()
def _create_dynamodb_trigger_from_meta(self, lambda_name, lambda_arn,
role_name, trigger_meta):
validate_params(lambda_name, trigger_meta,
DYNAMODB_TRIGGER_REQUIRED_PARAMS)
table_name = trigger_meta['target_table']
if not self.dynamodb_conn.describe_table(table_name):
_LOG.warning(f'DynamoDB table \'{table_name}\' does not exist '
f'and could not be configured as a trigger '
f'for lambda {lambda_name} ')
return
batch_size, batch_window = self._resolve_batch_size_batch_window(
trigger_meta)
filters = trigger_meta.get('filters')
if not self.dynamodb_conn.is_stream_enabled(table_name):
self.dynamodb_conn.enable_table_stream(table_name)
stream = self.dynamodb_conn.get_table_stream_arn(table_name)
# TODO support another sub type
event_source = next(iter(self.lambda_conn.list_event_sources(
event_source_arn=stream, function_name=lambda_arn)), None)
if event_source:
_LOG.info(f'Lambda event source mapping for source arn '
f'{stream} and lambda arn {lambda_arn} was found. '
f'Updating it')
self.lambda_conn.update_event_source(
event_source['UUID'], function_name=lambda_arn,
batch_size=batch_size,
batch_window=batch_window, filters=filters)
else:
self.lambda_conn.add_event_source(
lambda_arn, stream, batch_size=batch_size,
batch_window=batch_window, start_position='LATEST',
filters=filters
)
# start_position='LATEST' - in case we did not remove tables before
_LOG.info('Lambda %s subscribed to dynamodb table %s', lambda_name,
table_name)
@retry()
def _create_sqs_trigger_from_meta(self, lambda_name, lambda_arn, role_name,
trigger_meta):
validate_params(lambda_name, trigger_meta, SQS_TRIGGER_REQUIRED_PARAMS)
target_queue = trigger_meta['target_queue']
function_response_types = trigger_meta.get(
"function_response_types", [])
batch_size, batch_window = self._resolve_batch_size_batch_window(
trigger_meta)
if not self.sqs_conn.get_queue_url(target_queue, self.account_id):
_LOG.warning(f'SQS queue \'{target_queue}\' does not exist '
f'and could not be configured as a trigger '
f'for lambda {lambda_name} ')
return
queue_arn = 'arn:aws:sqs:{0}:{1}:{2}'.format(self.region,
self.account_id,
target_queue)
event_source = next(iter(self.lambda_conn.list_event_sources(
event_source_arn=queue_arn, function_name=lambda_arn)), None)
if event_source:
_LOG.info(f'Lambda event source mapping for source arn '
f'{queue_arn} and lambda arn {lambda_arn} was found. '
f'Updating it')
self.lambda_conn.update_event_source(
event_source['UUID'], function_name=lambda_arn,
batch_size=batch_size,
batch_window=batch_window,
function_response_types=function_response_types)
else:
self.lambda_conn.add_event_source(
lambda_arn, queue_arn, batch_size, batch_window,
function_response_types=function_response_types
)
_LOG.info('Lambda %s subscribed to SQS queue %s', lambda_name,
target_queue)
@retry()
def _create_cloud_watch_trigger_from_meta(self, lambda_name, lambda_arn,
role_name, trigger_meta):
validate_params(lambda_name, trigger_meta,
CLOUD_WATCH_TRIGGER_REQUIRED_PARAMS)
rule_name = trigger_meta['target_rule']
# TODO add InputPath & InputTransformer if needed
input_dict = trigger_meta.get('input')
rule_arn = self.cw_events_conn.get_rule_arn(rule_name)
if not rule_arn:
_LOG.warning(f'Event Bridge rule \'{rule_name}\' does not exist '
f'and could not be configured as a trigger '
f'for lambda {lambda_name} ')
return
targets = self.cw_events_conn.list_targets_by_rule(rule_name)
if lambda_arn not in map(lambda each: each.get('Arn'), targets):
self.cw_events_conn.add_rule_target(rule_name, lambda_arn,
input_dict)
self.lambda_conn.add_invocation_permission(lambda_arn,
'events.amazonaws.com',
rule_arn)
_LOG.info(f'Lambda {lambda_name} subscribed to cloudwatch rule '
f'{rule_name}')
else:
_LOG.info(f'Lambda {lambda_name} is already bound '
f'to cloudwatch rule {rule_name} as a target')
@retry()
# allow only sequential s3 trigger creation because it is done via 'put'
# operation which will override any other concurrent request otherwise
@threading_lock
def _create_s3_trigger_from_meta(self, lambda_name, lambda_arn, role_name,
trigger_meta):
validate_params(lambda_name, trigger_meta, S3_TRIGGER_REQUIRED_PARAMS)
target_bucket = trigger_meta['target_bucket']
if not self.s3_conn.is_bucket_exists(target_bucket):
_LOG.warning(f'S3 bucket {target_bucket} does not exist '
f'and could not be configured as a trigger '
f'for lambda {lambda_name} ')
return
bucket_arn = f'arn:aws:s3:::{target_bucket}'
self.lambda_conn.add_invocation_permission(
lambda_arn, 's3.amazonaws.com', bucket_arn)
_LOG.debug(f'Waiting for activation of invoke-permission '
f'of {bucket_arn}')
time.sleep(5)
self.s3_conn.add_lambda_event_source(
target_bucket, lambda_arn, trigger_meta)
_LOG.info(f'Lambda {lambda_name} subscribed to '
f'S3 bucket {target_bucket}')
@retry()
def _create_sns_topic_trigger_from_meta(self, lambda_name, lambda_arn,
role_name, trigger_meta):
validate_params(lambda_name, trigger_meta, SNS_TRIGGER_REQUIRED_PARAMS)
topic_name = trigger_meta['target_topic']
if not self.sns_conn.get_topic_arn(topic_name):
_LOG.warning(f'SNS topic {topic_name} does not exist '
f'and could not be configured as a trigger '
f'for lambda {lambda_name}')
return
region = trigger_meta.get('region')
self.sns_res.create_sns_subscription_for_lambda(lambda_arn,
topic_name,
region)
_LOG.info('Lambda %s subscribed to sns topic %s', lambda_name,
trigger_meta['target_topic'])
@retry()
def _create_kinesis_stream_trigger_from_meta(self, lambda_name, lambda_arn,
role_name, trigger_meta):
validate_params(lambda_name, trigger_meta,
KINESIS_TRIGGER_REQUIRED_PARAMS)
stream_name = trigger_meta['target_stream']
stream_description = self.kinesis_conn.get_stream(stream_name)
if not stream_description:
_LOG.warning(f'Kinesis stream \'{stream_name}\' does not exist '
f'and could not be configured as a trigger '
f'for lambda {lambda_name} ')
return
stream_arn = stream_description['StreamARN']
stream_status = stream_description['StreamStatus']
# additional waiting for stream
if stream_status != 'ACTIVE':
_LOG.debug('Kinesis stream %s is not in active state,'
' waiting for activation...', stream_name)
time.sleep(120)
# TODO policy should be moved to meta
policy_name = '{0}KinesisTo{1}Lambda'.format(stream_name, lambda_name)
policy_document = {
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
lambda_arn
]
},
{
"Action": [
"kinesis:DescribeStreams",
"kinesis:DescribeStream",
"kinesis:ListStreams",
"kinesis:GetShardIterator",
"Kinesis:GetRecords"
],
"Effect": "Allow",
"Resource": stream_arn
}
],
"Version": "2012-10-17"
}
self.iam_conn.attach_inline_policy(role_name=role_name,
policy_name=policy_name,
policy_document=policy_document)
_LOG.debug('Inline policy %s is attached to role %s',
policy_name, role_name)
_LOG.debug('Waiting for activation policy %s...', policy_name)
time.sleep(10)
self._add_kinesis_event_source(lambda_arn, stream_arn, trigger_meta)
_LOG.info('Lambda %s subscribed to kinesis stream %s', lambda_name,
stream_name)
@retry()
def _add_kinesis_event_source(self, lambda_name, stream_arn, trigger_meta):
self.lambda_conn.add_event_source(
func_name=lambda_name, stream_arn=stream_arn,
batch_size=trigger_meta['batch_size'],
start_position=trigger_meta['starting_position'])
CREATE_TRIGGER = {
DYNAMO_DB_TRIGGER: _create_dynamodb_trigger_from_meta,
CLOUD_WATCH_RULE_TRIGGER: _create_cloud_watch_trigger_from_meta,
EVENT_BRIDGE_RULE_TRIGGER: _create_cloud_watch_trigger_from_meta,
S3_TRIGGER: _create_s3_trigger_from_meta,
SNS_TOPIC_TRIGGER: _create_sns_topic_trigger_from_meta,
KINESIS_TRIGGER: _create_kinesis_stream_trigger_from_meta,
SQS_TRIGGER: _create_sqs_trigger_from_meta
}
@retry()
def _remove_cloud_watch_trigger(self, lambda_name, lambda_arn,
trigger_meta):
target_rule = trigger_meta['target_rule']
targets = []
if self.cw_events_conn.get_rule(target_rule):
targets = self.cw_events_conn.list_targets_by_rule(target_rule)
for target in targets:
if target['Arn'] == lambda_arn:
# remove target so that this rule won't trigger lambda
self.cw_events_conn.remove_targets(
rule_name=target_rule,
target_ids=[target['Id']]
)
_LOG.info(f'Lambda {lambda_name} unsubscribed from '
f'cloudwatch rule {target_rule}')
break
# remove event bridge permission to invoke lambda
# to remove this trigger from lambda triggers section
self.remove_permissions_by_resource_name(lambda_arn, target_rule)
_LOG.info(f'Removed event bridge rule {target_rule} permissions to '
f'invoke lambda {lambda_name}')
@retry()
def _remove_sns_topic_trigger(self, lambda_name, lambda_arn, trigger_meta):
target_topic = trigger_meta['target_topic']
subscriptions = []
topic_arn = self.sns_conn.get_topic_arn(name=target_topic)
if topic_arn:
subscriptions = self.sns_conn.list_subscriptions_by_topic(
topic_arn=topic_arn)
for subscription in subscriptions:
if subscription['Protocol'] == 'lambda' \
and subscription['Endpoint'] == lambda_arn:
# remove subscription so that this topic won't trigger lambda
self.sns_conn.unsubscribe(
subscription_arn=subscription['SubscriptionArn'])
_LOG.info(f'Lambda {lambda_name} unsubscribed '
f'from topic {target_topic}')
break
# remove sns permission to invoke lambda
# to remove this trigger from lambda triggers section
self.remove_permissions_by_resource_name(lambda_arn, target_topic)
_LOG.info(f'Removed sns topic {target_topic} permissions to invoke '
f'lambda {lambda_name}')
@retry()
# allow only sequential s3 trigger deletion because it is done via 'put'
# operation which will override any other concurrent request otherwise
@threading_lock
def _remove_s3_trigger(self, lambda_name, lambda_arn, trigger_meta):
target_bucket = trigger_meta['target_bucket']
if self.s3_conn.is_bucket_exists(target_bucket):
self.s3_conn.remove_lambda_event_source(
target_bucket, lambda_arn, trigger_meta)
_LOG.info(f'Lambda {lambda_name} unsubscribed from '
f's3 bucket {target_bucket}')
# remove s3 permission to invoke lambda
# to remove this trigger from lambda triggers section
self.remove_permissions_by_resource_name(
lambda_arn, target_bucket, all_permissions=False)
_LOG.info(f'Removed s3 bucket {target_bucket} permissions to invoke '
f'lambda {lambda_name}')
@retry()
def _remove_sqs_trigger(self, lambda_name, lambda_arn, trigger_meta):
target_queue = trigger_meta['target_queue']
self._remove_event_source(
lambda_name=lambda_name, lambda_arn=lambda_arn,
event_source_name=target_queue)
@retry()
def _remove_dynamodb_trigger(self, lambda_name, lambda_arn, trigger_meta):
target_table = trigger_meta['target_table']
self._remove_event_source(
lambda_name=lambda_name, lambda_arn=lambda_arn,
event_source_name=target_table)
@retry()
def _remove_kinesis_stream_trigger(self, lambda_name, lambda_arn,
trigger_meta):
target_stream = trigger_meta['target_stream']
self._remove_event_source(
lambda_name=lambda_name, lambda_arn=lambda_arn,
event_source_name=target_stream)
def _remove_event_source(self, lambda_name, lambda_arn, event_source_name):
""" Remove event source from lambda triggers.
SQS, DynamoDB, Kinezis, Kafka and MQ resources are considered
as event sources according to AWS """
uuid = None
event_sources = self.lambda_conn.triggers_list(lambda_name=lambda_arn)
for event_source in event_sources:
if event_source_name in event_source['EventSourceArn']:
uuid = event_source['UUID']
break
if uuid:
self.lambda_conn.remove_event_source(uuid=uuid)
_LOG.info(f'Lambda {lambda_name} unsubscribed from '
f'event source {event_source_name}')
else:
_LOG.warning(f'Could not remove event source {event_source_name} '
f'from lambda {lambda_name}.')
REMOVE_TRIGGER = {
DYNAMO_DB_TRIGGER: _remove_dynamodb_trigger,
CLOUD_WATCH_RULE_TRIGGER: _remove_cloud_watch_trigger,
EVENT_BRIDGE_RULE_TRIGGER: _remove_cloud_watch_trigger,
S3_TRIGGER: _remove_s3_trigger,
SNS_TOPIC_TRIGGER: _remove_sns_topic_trigger,
KINESIS_TRIGGER: _remove_kinesis_stream_trigger,
SQS_TRIGGER: _remove_sqs_trigger
}
def create_lambda_triggers(self, name, arn, role_name, event_sources_meta):
for event_source in event_sources_meta:
resource_type = event_source['resource_type']
func = self.CREATE_TRIGGER[resource_type]
func(self, name, arn, role_name, event_source)
def update_lambda_triggers(self, name, arn, role_name, event_sources_meta):
from syndicate.core import CONFIG, PROJECT_STATE
# load latest output to compare it with current event sources
deploy_name = PROJECT_STATE.latest_deploy.get('deploy_name')
bundle_name = PROJECT_STATE.get_latest_deployed_or_updated_bundle(
PROJECT_STATE.current_bundle, latest_if_not_found=True)
if not bundle_name:
bundle_name = PROJECT_STATE.latest_modification.get('bundle_name')
regular_key_compound = PurePath(
CONFIG.deploy_target_bucket_key_compound,
_build_output_key(
bundle_name=bundle_name, deploy_name=deploy_name,
is_regular_output=True)).as_posix()
if self.s3_conn.is_file_exists(
CONFIG.deploy_target_bucket,
regular_key_compound):
key_compound = regular_key_compound
else:
key_compound = PurePath(
CONFIG.deploy_target_bucket_key_compound,
_build_output_key(
bundle_name=bundle_name, deploy_name=deploy_name,
is_regular_output=False)).as_posix()
output_file = self.s3_conn.load_file_body(
CONFIG.deploy_target_bucket, key_compound)
latest_output = json.loads(output_file)
prev_event_sources_meta = []
for resource in latest_output:
if arn in resource:
resource_meta = latest_output[resource]['resource_meta']
prev_event_sources_meta = \
resource_meta.get('event_sources', [])
break
# remove triggers that are absent or changed in new meta
to_remove = [event_source for event_source in prev_event_sources_meta
if event_source not in event_sources_meta]
self.remove_lambda_triggers(name, arn, to_remove)
# create/update triggers
self.create_lambda_triggers(name, arn, role_name, event_sources_meta)
def remove_lambda_triggers(self, lambda_name, lambda_arn,
event_sources_meta):
for event_source in event_sources_meta:
resource_type = event_source['resource_type']
func = self.REMOVE_TRIGGER[resource_type]
func(self, lambda_name, lambda_arn, event_source)
def remove_lambdas(self, args):
return self.create_pool(self._remove_lambda, args)
@unpack_kwargs
@retry()
def _remove_lambda(self, arn, config):
# can't describe lambda event sources with $LATEST version in arn
original_arn = arn
arn = arn.replace(':$LATEST', '')
lambda_name = config['resource_name']
event_sources_meta = config['resource_meta'].get('event_sources', [])
try:
self.lambda_conn.delete_lambda(lambda_name,
log_not_found_error=False)
self.remove_lambda_triggers(lambda_name, arn, event_sources_meta)
group_names = self.cw_logs_conn.get_log_group_names()
for each in group_names:
if lambda_name == each.split('/')[-1]:
self.cw_logs_conn.delete_log_group_name(each)
_LOG.info('Lambda %s was removed.', lambda_name)
return {original_arn: config}
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
_LOG.warn('Lambda %s is not found', lambda_name)
return {original_arn: config}
else:
raise e
def create_lambda_layer(self, args):
return self.create_pool(self.create_lambda_layer_from_meta, args)
@unpack_kwargs
def create_lambda_layer_from_meta(self, name, meta, context=None):
"""
:param name:
:param meta:
:param context: because of usage in 'update' flow
:return:
"""
from syndicate.core import CONFIG
validate_params(name, meta, LAMBDA_LAYER_REQUIRED_PARAMS)
key = meta[S3_PATH_NAME]
key_compound = PurePath(CONFIG.deploy_target_bucket_key_compound,
key).as_posix()
file_name = key.split('/')[-1]
self.s3_conn.download_file(self.deploy_target_bucket, key_compound,
file_name)
if is_zip_empty(file_name):
message = f'Can not create layer \'{name}\' because of empty ' \
f'deployment package zip file.'
_LOG.error(message)
return {}, [message]
with open(file_name, 'rb') as file_data:
file_body = file_data.read()
import hashlib
hash_object = hashlib.sha256()
hash_object.update(file_body)
existing_version = self._is_equal_lambda_layer(hash_object.digest(),
name)
if existing_version:
existing_layer_arn = existing_version['LayerVersionArn']
_LOG.info(f'Layer {name} with same content already '
f'exists in layer version {existing_layer_arn}.')
return {
existing_layer_arn: build_description_obj(
response=existing_version, name=name, meta=meta
)}
_LOG.debug(f'Creating lambda layer {name}')
args = {'layer_name': name, 'runtimes': meta['runtimes'],
's3_bucket': self.deploy_target_bucket,
's3_key': PurePath(CONFIG.deploy_target_bucket_key_compound,
meta[S3_PATH_NAME]).as_posix()}
if meta.get('description'):
args['description'] = meta['description']
if meta.get('license'):
args['layer_license'] = meta['license']
if meta.get('architectures'):
args['architectures'] = meta['architectures']
response = self.lambda_conn.create_layer(**args)
_LOG.info(f'Lambda Layer {name} version {response["Version"]} '
f'was successfully created')
layer_arn = response['LayerArn'] + ':' + str(response['Version'])
del response['LayerArn']
return {
layer_arn: build_description_obj(
response, name, meta)
}
def remove_lambda_layers(self, args):
return self.create_pool(self._remove_lambda_layers, args)
@unpack_kwargs
@retry()
def _remove_lambda_layers(self, arn, config):
layer_name = config['resource_name']
_LOG.info('The latest lambda layer {0} version {1} was found.'.format(
layer_name, arn.split(':')[-1]))
layers_list = self.lambda_conn.list_lambda_layer_versions(layer_name)
try:
for arn in [layer['LayerVersionArn'] for layer in layers_list]:
layer_version = arn.split(':')[-1]
self.lambda_conn.delete_layer(arn, log_not_found_error=False)
_LOG.info('Lambda layer {0} version {1} was removed.'.format(
layer_name, layer_version))
return {arn: config}
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
_LOG.warn('Lambda Layer {} is not found'.format(layer_name))
return {arn: config}
else:
raise e
def describe_lambda_layer(self, name, meta, response=None):
if not response:
layer_versions = self.lambda_conn.list_lambda_layer_versions(
name=name
)
if not layer_versions:
_LOG.warn(f'No versions available for layer {name}')
return {}
else:
latest_version = max(
layer_versions, key=lambda x: x['Version'])
response = self.lambda_conn.get_layer_version(
name=name,
version=latest_version['Version']
)
if not response:
return {}
arn = response.pop('LayerArn')
return {
arn: build_description_obj(response, name, meta)
}
@staticmethod
def _resolve_snap_start(meta: dict) -> Optional[str]:
runtime: str = meta.get('runtime')
if not runtime:
return
runtime = runtime.lower()
snap_start = meta.get(SNAP_START, None)
if snap_start and snap_start not in _SNAP_START_CONFIGURATIONS:
values = ', '.join(map('"{}"'.format, _SNAP_START_CONFIGURATIONS))
issue = f'must reflect one of the following values: {values}'
_LOG.warn(f'If given "{SNAP_START}" - {issue}.')
snap_start = None
if snap_start and 'java' not in runtime:
_LOG.warn(f'"{runtime}" runtime does support \'{SNAP_START}\'.')
snap_start = None
return snap_start
@staticmethod
def _resolve_batch_size_batch_window(trigger_meta):
batch_size = trigger_meta.get('batch_size')
batch_window = trigger_meta.get('batch_window')
if batch_size:
if batch_size > 10 and not batch_window:
batch_window = 1
_LOG.info("The parameter 'batch_window' is set to the minimum "
f"default value ({batch_window}) because "
f"'batch_size' is greater than 10")
return batch_size, batch_window
def _resolve_log_group(self, lambda_name: str, meta: dict):
log_group = self.cw_logs_conn.get_log_group_by_lambda_name(
lambda_name=lambda_name
)
if log_group:
log_group_name = log_group.get("logGroupName")
_LOG.info(f"CloudWatch log group with name: {log_group_name}"
f" for lambda: {lambda_name} already exists.")
if not log_group:
_LOG.info(f"Cloud Watch log group for lambda: {lambda_name} does"
f" not exists. Creating new log group with name:"
f" aws/lambda/{lambda_name}")
possible_retention = meta.get(
'logs_expiration', DEFAULT_LOGS_EXPIRATION)
try:
retention = int(possible_retention)
except (TypeError, ValueError):
_LOG.warning(
f"Can't parse logs_expiration `{possible_retention}"
f" as int. Set default {DEFAULT_LOGS_EXPIRATION}"
)
retention = DEFAULT_LOGS_EXPIRATION
if retention:
self.cw_logs_conn.create_log_group_with_retention_days(
group_name=lambda_name,
retention_in_days=retention,
tags=meta.get('tags')
)
def _resolve_env_variables(self, env_vars):
required_params = ['resource_name', 'resource_type', 'parameter']
for key, value in env_vars.items():
if isinstance(value, dict):
resource_name = value.get('resource_name')
resource_type = value.get('resource_type')
parameter = value.get('parameter')
if not all([resource_name, resource_type, parameter]):
missed_params = [p for p in required_params if
value.get(p) is None]
env_vars[key] = NOT_AVAILABLE
USER_LOG.warn(
f"Unable to resolve value for environment variable "
f"'{key}' because of missing parameter/s. Required "
f"parameters: {required_params}; missed parameters/s "
f"{missed_params}."
f"The environment variable '{key}' will be configured "
f"with the value '{NOT_AVAILABLE}'."
)
continue
_LOG.debug(
f"Going to resolve the value for the environment variable "
f"'{key}' by the parameter '{parameter}' of the resource "
f"type '{resource_type}' with the name '{resource_name}'.")
resolver = self.dynamic_params_resolvers.get(
(resource_type, parameter)
)
if resolver is None:
USER_LOG.warn(
f"Currently resolving parameter '{parameter}' for the "
f"resource type '{resource_type}' is not supported.")
env_vars[key] = NOT_AVAILABLE
else:
env_vars[key] = (resolver(resource_name) or NOT_AVAILABLE)
if env_vars[key] == NOT_AVAILABLE:
USER_LOG.warn(
f"Unable to resolve parameter '{parameter}' for the "
f"resource type '{resource_type}' with name "
f"'{resource_name}'.")
_LOG.debug(
f"The environment variable '{key}' will be configured "
f"with the value '{env_vars[key]}'."
)