in syndicate/core/resources/lambda_resource.py [0:0]
def _update_lambda(self, name, meta, context):
from syndicate.core import CONFIG
_LOG.info('Updating lambda: {0}'.format(name))
req_params = ['runtime', 'memory', 'timeout', 'func_name']
validate_params(name, meta, req_params)
key = meta[S3_PATH_NAME]
key_compound = PurePath(CONFIG.deploy_target_bucket_key_compound,
key).as_posix()
if not self.s3_conn.is_file_exists(self.deploy_target_bucket,
key_compound):
raise AssertionError(
'Deployment package {0} does not exist '
'in {1} bucket'.format(key_compound,
self.deploy_target_bucket))
response = self.lambda_conn.get_function(name)
if not response:
raise AssertionError('{0} lambda does not exist.'.format(name))
old_conf = response['Configuration']
publish_version = meta.get('publish_version', False)
self.lambda_conn.update_code_source(
lambda_name=name,
s3_bucket=self.deploy_target_bucket,
s3_key=key_compound,
publish_version=publish_version)
role_name = meta['iam_role_name']
role_arn = self.iam_conn.check_if_role_exists(role_name)
if not role_arn:
_LOG.warning('Execution role does not exist. Keeping the old one')
role_arn = if_updated(role_arn, old_conf.get('Role'))
handler = if_updated(meta.get('func_name'), old_conf.get('Handler'))
env_vars = meta.get('env_variables', {})
timeout = if_updated(meta.get('timeout'), old_conf.get('Timeout'))
memory_size = if_updated(meta.get('memory_size'),
old_conf.get('MemorySize'))
old_subnets, old_security_groups, _ = self.lambda_conn.retrieve_vpc_config(
old_conf)
vpc_subnets = if_updated(set(meta.get('subnet_ids') or []),
old_subnets)
vpc_security_group = if_updated(
set(meta.get('security_group_ids') or []), old_security_groups)
runtime = if_updated(meta.get('runtime'), old_conf.get('Runtime'))
ephemeral_storage = if_updated(
meta.get('ephemeral_storage'),
self.lambda_conn.retrieve_ephemeral_storage(old_conf))
dl_type = meta.get('dl_resource_type')
if dl_type:
dl_type = dl_type.lower()
dl_name = meta.get('dl_resource_name')
dl_target_arn = 'arn:aws:{0}:{1}:{2}:{3}'.format(
dl_type,
self.region,
self.account_id,
dl_name) if dl_type and dl_name else None
lambda_layers_arns = []
layer_meta = meta.get('layers')
if layer_meta:
if 'dotnet' in meta['runtime'].lower():
env_vars.update(_DOTNET_LAMBDA_SHARED_STORE_ENV)
meta['env_variables'] = env_vars
for layer_name in layer_meta:
layer_arn = self.lambda_conn.get_lambda_layer_arn(layer_name)
if not layer_arn:
raise AssertionError(
'Could not link lambda layer {} to lambda {} '
'due to layer absence!'.format(layer_name, name))
lambda_layers_arns.append(layer_arn)
if env_vars:
self._resolve_env_variables(env_vars)
_LOG.info(f'Updating lambda {name} configuration')
self.lambda_conn.update_lambda_configuration(
lambda_name=name, role=role_arn, handler=handler,
env_vars=env_vars,
timeout=timeout, memory_size=memory_size, runtime=runtime,
vpc_sub_nets=vpc_subnets, vpc_security_group=vpc_security_group,
dead_letter_arn=dl_target_arn, layers=lambda_layers_arns,
ephemeral_storage=ephemeral_storage,
snap_start=self._resolve_snap_start(meta=meta)
)
_LOG.info(f'Lambda configuration has been updated')
# It seems to me that the waiter is not necessary here, the method
# lambda_conn.update_lambda_configuration is the one that actually
# waits. But still it does not make it worse :)
_LOG.info(f'Initializing function updated waiter for {name}')
waiter = self.lambda_conn.get_waiter('function_updated_v2')
waiter.wait(FunctionName=name)
_LOG.info(f'Waiting has finished')
self._resolve_log_group(lambda_name=name, meta=meta)
response = self.lambda_conn.get_function(name)
_LOG.info(f'Lambda describe result: {response}')
code_sha_256 = response['Configuration']['CodeSha256']
publish_ver_response = self.lambda_conn.publish_version(
function_name=name,
code_sha_256=code_sha_256)
updated_version = publish_ver_response['Version']
_LOG.info(
f'Version {updated_version} for lambda {name} published')
alias_name = meta.get('alias')
aliases = list(
self.lambda_conn.get_aliases(function_name=name).keys()
)
if alias_name:
if alias_name in aliases:
self.lambda_conn.update_alias(
function_name=name,
alias_name=alias_name,
function_version=updated_version)
_LOG.info(
f'Alias {alias_name} has been updated for lambda {name}')
else:
self.lambda_conn.create_alias(
function_name=name,
name=alias_name,
version=updated_version)
_LOG.info(
f'Alias {alias_name} has been created for lambda {name}')
for alias in aliases:
if alias != alias_name:
self.lambda_conn.delete_alias(
function_name=name,
name=alias
)
aliases.remove(alias)
url_config = meta.get('url_config')
if url_config:
_LOG.info('URL config is found. Setting the function URL')
url = self.lambda_conn.set_url_config(
function_name=name, auth_type=url_config.get('auth_type'),
qualifier=alias_name, cors=url_config.get('cors'),
principal=url_config.get('principal'),
source_arn=url_config.get('source_arn')
)
print(f'{name}:{alias_name if alias_name else ""}: {url}')
else:
existing_url = self.lambda_conn.get_url_config(
function_name=name, qualifier=alias_name)
if existing_url:
_LOG.info('Going to delete existing URL config that is not '
'described in the lambda_config file')
self.lambda_conn.delete_url_config(
function_name=name, qualifier=alias_name)
arn = self.build_lambda_arn_with_alias(response, alias_name) \
if publish_version or alias_name else \
response['Configuration']['FunctionArn']
_LOG.debug(f'Resolved lambda arn: {arn}')
event_sources_meta = meta.get('event_sources', [])
self.update_lambda_triggers(name, arn, role_name, event_sources_meta)
if meta.get('max_retries') is not None:
_LOG.debug('Updating lambda event invoke config')
function_name = (name + ":" + alias_name) if alias_name else name
try:
_LOG.debug('Updating lambda event invoke config')
invoke_config = \
self.lambda_conn.update_function_event_invoke_config(
function_name=function_name,
max_retries=meta.get('max_retries')
)
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
_LOG.debug('Lambda event invoke config is absent. '
'Creating a new one')
invoke_config = \
self.lambda_conn.put_function_event_invoke_config(
function_name=function_name,
max_retries=meta.get('max_retries')
)
else:
raise e
_LOG.debug(invoke_config)
req_max_concurrency = meta.get(LAMBDA_MAX_CONCURRENCY)
existing_max_concurrency = self.lambda_conn. \
describe_function_concurrency(name=name)
if req_max_concurrency and existing_max_concurrency:
if existing_max_concurrency != req_max_concurrency:
self._set_function_concurrency(name=name, meta=meta)
elif not req_max_concurrency and existing_max_concurrency:
self.lambda_conn.delete_function_concurrency_config(name=name)
elif req_max_concurrency and not existing_max_concurrency:
self._set_function_concurrency(name=name, meta=meta)
self._manage_provisioned_concurrency_configuration(function_name=name,
meta=meta,
lambda_def=context)
_LOG.info(f'Updating has finished for lambda {name}')
return self.describe_lambda(name, meta, response)