in syndicate/core/resources/iam_resource.py [0:0]
def _update_role_from_meta(self, name, meta, context):
_LOG.info(f'Updating iam role: {name}')
existing_role = self.iam_conn.get_role(name)
if not existing_role:
_LOG.warn(f'IAM role {name} does not exist.')
raise AssertionError(f'{name} role does not exist.')
custom_policies = meta.get('custom_policies', [])
predefined_policies = meta.get('predefined_policies', [])
policies = set(custom_policies + predefined_policies)
allowed_accounts = meta.get('allowed_accounts', [])
principal_service = meta.get('principal_service')
instance_profile = meta.get('instance_profile')
external_id = meta.get('external_id')
trust_rltn = meta.get('trusted_relationships')
permissions_boundary = meta.get('permissions_boundary')
if principal_service and '{region}' in principal_service:
principal_service = principal_service.format(region=self.region)
response = self.iam_conn.update_custom_role(
role_name=name,
allowed_account=list(allowed_accounts),
allowed_service=principal_service,
external_id=external_id,
trusted_relationships=trust_rltn,
role=existing_role)
if not response:
raise AssertionError(f'Can not update role \'{name}\': '
f'role does not exist.')
if instance_profile:
profiles = self.iam_conn.get_instance_profiles_for_role(
role_name=name)
if not profiles:
try:
self.iam_conn.create_instance_profile(name)
except ClientError as e:
if 'EntityAlreadyExists' in str(e):
_LOG.warn(f'Instance profile {name} exists')
else:
raise e
self.iam_conn.add_role_to_instance_profile(name, name)
else:
profiles = self.iam_conn.get_instance_profiles_for_role(
role_name=name)
if profiles:
try:
self.iam_conn.remove_instance_profile(name)
except ClientError as e:
if 'NoSuchEntityException' in str(e):
_LOG.warn(f'Instance profile {name} does not exist.')
else:
raise e
# attach policies
existing_policies = self.iam_conn.get_role_attached_policies(
role_name=name)
for existing_policy in existing_policies:
self.iam_conn.detach_policy(role_name=name,
policy_arn=existing_policy.arn)
if policies:
for policy in policies:
arn = self.iam_conn.get_policy_arn(policy)
if not arn:
raise AssertionError(f'Can not get policy arn: {policy}')
self.iam_conn.attach_policy(name, arn)
_LOG.info(f'Updated IAM role {name}.')
if permissions_boundary:
self._attach_permissions_boundary_to_role(permissions_boundary,
name)
else:
_LOG.warn(f'Permissions boundary is not specified in meta. '
f'Updating role \'{name}\', removing boundary policy')
self.iam_conn.delete_role_permissions_boundary(role_name=name)
return self.describe_role(name=name, meta=meta)