syndicate/connection/iam_connection.py (539 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from json import dumps
from functools import lru_cache
from boto3 import client, resource
from botocore.exceptions import ClientError
from syndicate.commons.log_helper import get_logger
from syndicate.connection.helper import apply_methods_decorator, retry
_LOG = get_logger(__name__)
def get_account_role_arn(account_number):
return "arn:aws:iam::{0}:root".format(account_number)
@apply_methods_decorator(retry())
class IAMConnection(object):
""" IAM connection class."""
def build_role_arn(self, role_name: str) -> str:
from syndicate.core import CONFIG
return f'arn:aws:iam::{CONFIG.account_id}:role' \
f'/{CONFIG.resources_prefix}{role_name}{CONFIG.resources_suffix}'
def __init__(self, region=None, aws_access_key_id=None,
aws_secret_access_key=None, aws_session_token=None):
self.client = client('iam', region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token)
self.resource = resource('iam', region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token)
_LOG.debug('Opened new IAM connection.')
def check_if_role_exists(self, role_name):
list_roles = self.get_roles()
if list_roles:
for each in list_roles:
if role_name == each['RoleName']:
return each['Arn']
def get_role(self, role_name):
try:
return self.client.get_role(RoleName=role_name)['Role']
except ClientError as e:
if 'NoSuchEntity' in str(e):
pass # valid exception
else:
raise e
def get_missing_roles(self, required_roles):
list_roles = self.get_roles()
if list_roles:
all_roles = [role['RoleName'] for role in list_roles]
return [role for role in required_roles if role not in all_roles]
else:
return required_roles
def get_roles(self):
roles = []
response = self.client.list_roles()
token = response.get('Marker')
roles.extend(response.get('Roles'))
while token:
response = self.client.list_roles(Marker=token)
token = response.get('Marker')
roles.extend(response.get('Roles'))
return roles
@lru_cache()
def get_policies(self, scope='All', only_attached=False):
"""
:param scope: 'All'|'AWS'|'Local'
:type only_attached: bool
"""
policies = []
response = self.client.list_policies(
Scope=scope,
OnlyAttached=only_attached
)
token = response.get('Marker')
policies.extend(response.get('Policies'))
while token:
response = self.client.list_policies(
Scope=scope,
OnlyAttached=only_attached,
Marker=token
)
token = response.get('Marker')
policies.extend(response.get('Policies'))
return policies
def get_role_attached_policies(self, role_name):
role = self.resource.Role(role_name)
return role.attached_policies.all()
def get_role_inline_policies(self, role_name):
role = self.resource.Role(role_name)
return role.policies.all()
def get_role_names(self):
roles = self.get_roles()
return [each['RoleName'] for each in roles]
def get_attached_policy_content(self, policy_arn):
""" Get content from policy: latest version.
:type policy_arn: str
"""
version = self.client.list_policy_versions(PolicyArn=policy_arn)
policy_version = version['Versions'][0]['VersionId']
policy_content = self.client.get_policy_version(
PolicyArn=policy_arn, VersionId=policy_version)
return policy_content['PolicyVersion']['Document']
def create_custom_policy(self, policy_name, policy_document, tags):
"""
:type policy_name: str
:type policy_document: dict or str
:type tags: list of dict
"""
if isinstance(policy_document, dict):
policy_document = dumps(policy_document)
params = dict(
PolicyName=policy_name,
PolicyDocument=policy_document
)
if tags:
params['Tags'] = tags
return self.client.create_policy(**params)['Policy']
def create_custom_role(self, role_name, allowed_account=None,
allowed_service=None, trusted_relationships=None,
external_id=None, permissions_boundary=None,
tags=None):
""" Create custom role with trusted relationships. You can specify
custom policy, or set principal_account and principal_service params
to use default.
:type role_name: str
:type allowed_account: str (acc id)
:type allowed_service: str
:type trusted_relationships: dict
:param trusted_relationships: if not specified will use default
:param tags: list of dict: List of resource tags key-value pairs
"""
if not trusted_relationships:
trusted_relationships = IAMConnection.empty_trusted_relationships()
if allowed_account:
trusted_accounts = IAMConnection.set_allowed_account(
allowed_account, external_id, 'create')
trusted_relationships['Statement'].append(trusted_accounts)
if allowed_service:
trusted_services = IAMConnection.set_allowed_service(
allowed_service, 'create')
trusted_relationships['Statement'].append(trusted_services)
if isinstance(trusted_relationships, dict):
trusted_relationships = dumps(trusted_relationships)
params = dict(RoleName=role_name,
AssumeRolePolicyDocument=trusted_relationships)
if permissions_boundary:
params['PermissionsBoundary'] = permissions_boundary
if tags:
params['Tags'] = tags
try:
role = self.client.create_role(**params)
return role['Role']
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
return self.client.get_role(role_name)['Role']
raise e
@staticmethod
def empty_trusted_relationships():
trusted_relationships = {
"Version": "2012-10-17",
"Statement": []
}
return trusted_relationships
def attach_policy(self, role_name, policy_arn):
self.client.attach_role_policy(
RoleName=role_name,
PolicyArn=policy_arn
)
def attach_inline_policy(self, role_name, policy_name, policy_document):
if isinstance(policy_document, dict):
policy_document = dumps(policy_document)
self.client.put_role_policy(
RoleName=role_name,
PolicyName=policy_name,
PolicyDocument=policy_document
)
def detach_policy(self, role_name, policy_arn):
self.client.detach_role_policy(
RoleName=role_name,
PolicyArn=policy_arn
)
def put_role_permissions_boundary(self, role_name, policy_arn):
_LOG.info(f'Attaching permissions boundary policy: \'{policy_arn}\''
f' to role: \'{role_name}\'')
self.client.put_role_permissions_boundary(
RoleName=role_name,
PermissionsBoundary=policy_arn
)
def delete_role_permissions_boundary(self, role_name):
_LOG.info(f'Removing permissions boundary policy from \'{role_name}\'')
try:
self.client.delete_role_permissions_boundary(
RoleName=role_name
)
except ClientError as e:
if 'NoSuchEntity' in str(e):
_LOG.warn(f'Role \'{role_name}\' doesn\'t have permissions '
f'boundary policy. Skipping...')
else:
_LOG.error(str(e))
raise e
def get_policy_arn(self, name, policy_scope='All'):
""" Get policy arn from list existing. To reduce list result there is
an ability to define policy scope.
:type policy_scope: str
:param policy_scope: 'All'|'AWS'|'Local'
:type name: str
"""
# TODO this method is highly time-ineffective especially if we, for
# instance, perform `syndicate transform` on a big meta, where
# there is a huge amount of policies names.
# lru_cache for self.get_policies makes the situation better but in
# general it should be refactored.
custom_policies = self.get_policies(policy_scope)
for each in custom_policies:
if each['PolicyName'] == name:
return each['Arn']
def get_policy(self, arn):
try:
return self.client.get_policy(PolicyArn=arn)['Policy']
except ClientError as e:
if 'NoSuchEntity' in str(e):
pass # valid exception
else:
raise e
def remove_policy_version(self, policy_arn, version_id):
self.client.delete_policy_version(
PolicyArn=policy_arn,
VersionId=version_id
)
def create_policy_version(self, policy_arn, policy_document,
set_as_default=None):
params = dict(PolicyArn=policy_arn,
PolicyDocument=policy_document)
if set_as_default:
params['SetAsDefault'] = set_as_default
self.client.create_policy_version(**params)
def remove_policy(self, policy_arn, log_not_found_error=True):
""" To remove policy all it version must be removed before default one.
:type policy_arn: str
:type log_not_found_error: boolean, parameter is needed for proper log
handling in the retry decorator
"""
version = self.client.list_policy_versions(PolicyArn=policy_arn)
policy_versions = version['Versions']
if policy_versions:
for each in policy_versions:
if each['IsDefaultVersion']:
continue
self.remove_policy_version(policy_arn, each['VersionId'])
self.client.delete_policy(PolicyArn=policy_arn)
def remove_role(self, role_name, log_not_found_error=True):
"""
log_not_found_error parameter is needed for proper log handling in the
retry decorator
"""
self.client.delete_role(RoleName=role_name)
def create_instance_profile(self, profile_name):
self.client.create_instance_profile(
InstanceProfileName=profile_name
)
def remove_instance_profile(self, profile_name):
self.client.delete_instance_profile(
InstanceProfileName=profile_name
)
def get_instance_profiles(self):
profiles = []
response = self.client.list_instance_profiles()
token = response.get('Marker')
profiles.extend(response.get('InstanceProfiles'))
while token:
response = self.client.list_instance_profiles(Marker=token)
token = response.get('Marker')
profiles.extend(response.get('InstanceProfiles'))
return profiles
def is_instance_profile_exists(self, profile_name):
profiles = self.get_instance_profiles()
for each in profiles:
if each['InstanceProfileName'] == profile_name:
return each
def add_role_to_instance_profile(self, profile_name, role_name):
self.client.add_role_to_instance_profile(
InstanceProfileName=profile_name,
RoleName=role_name
)
def remove_role_from_instance_profile(self, profile_name, role_name):
self.client.remove_role_from_instance_profile(
InstanceProfileName=profile_name,
RoleName=role_name
)
def get_instance_profiles_for_role(self, role_name):
profiles = []
response = self.client.list_instance_profiles_for_role(
RoleName=role_name
)
token = response.get('Marker')
profiles.extend(response.get('InstanceProfiles'))
while token:
response = self.client.list_instance_profiles_for_role(
RoleName=role_name, Marker=token
)
token = response.get('Marker')
profiles.extend(response.get('InstanceProfiles'))
return profiles
def get_assume_role_policy_document(self, role_name):
return self.resource.Role(role_name).assume_role_policy_document
def update_assume_role_policy_document(self, role_name, document):
self.resource.AssumeRolePolicy(role_name).update(
PolicyDocument=document)
def create_user(self, name, path=None):
params = dict(UserName=name)
if path:
params['Path'] = path
return self.client.create_user(**params)
def delete_user(self, name):
self.client.delete_user(UserName=name)
def attach_policy_to_user(self, name, policy_arn):
params = dict(UserName=name, PolicyArn=policy_arn)
self.client.attach_user_policy(**params)
def create_access_key(self, user_name):
return self.client.create_access_key(UserName=user_name)
def get_users(self, path=None):
users = []
params = dict()
if path:
params['PathPrefix'] = path
response = self.client.list_users(**params)
users.extend(response.get('Users'))
token = response.get('Marker')
while token:
params['Marker'] = token
response = self.client.list_users(**params)
token = response.get('Marker')
users.extend(response.get('Users'))
return users
def is_user_exists(self, user_name):
list_users = self.get_users()
if list_users:
for each in list_users:
if user_name == each['UserName']:
return each['Arn']
def get_access_keys(self, user_name):
keys = []
response = self.client.list_access_keys(UserName=user_name)
keys.extend(response.get('AccessKeyMetadata'))
token = response.get('Marker')
while token:
response = self.client.list_access_keys(UserName=user_name,
Marker=token)
token = response.get('Marker')
keys.extend(response.get('AccessKeyMetadata'))
return keys
def delete_access_key(self, user_name, access_key):
self.client.delete_access_key(AccessKeyId=access_key,
UserName=user_name)
def get_user_attached_policies(self, user_name, path=None):
policies = []
params = dict(UserName=user_name)
if path:
params['PathPrefix'] = path
response = self.client.list_attached_user_policies(**params)
policies.extend(response.get('AttachedPolicies'))
token = response.get('Marker')
while token:
params['Marker'] = token
response = self.client.list_attached_user_policies(**params)
token = response.get('Marker')
policies.extend(response.get('AttachedPolicies'))
return policies
def detach_user_policy(self, user_name, policy_arn):
self.client.detach_user_policy(UserName=user_name,
PolicyArn=policy_arn)
def get_user_certificates(self, user_name):
certs = []
response = self.client.list_signing_certificates(UserName=user_name)
certs.extend(response.get('Certificates'))
token = response.get('Marker')
while token:
response = self.client.list_signing_certificates(
UserName=user_name,
Marker=token)
token = response.get('Marker')
certs.extend(response.get('Certificates'))
return certs
def delete_user_certificate(self, user_name, cert_id):
self.client.delete_signing_certificate(UserName=user_name,
CertificateId=cert_id)
def get_user_ssh_keys(self, user_name):
ssh_keys = []
response = self.client.list_ssh_public_keys(UserName=user_name)
ssh_keys.extend(response.get('SSHPublicKeys'))
token = response.get('Marker')
while token:
response = self.client.list_ssh_public_keys(UserName=user_name,
Marker=token)
token = response.get('Marker')
ssh_keys.extend(response.get('SSHPublicKeys'))
return ssh_keys
def delete_user_ssh_key(self, user_name, ssh_id):
self.client.delete_ssh_public_key(UserName=user_name,
SSHPublicKeyId=ssh_id)
def get_user_inline_policies(self, user_name):
policies = []
response = self.client.list_user_policies(UserName=user_name)
policies.extend(response.get('PolicyNames'))
token = response.get('Marker')
while token:
response = self.client.list_user_policies(UserName=user_name,
Marker=token)
token = response.get('Marker')
policies.extend(response.get('PolicyNames'))
return policies
def update_custom_role(self, role, role_name, allowed_account=None,
allowed_service=None, trusted_relationships=None,
external_id=None):
updated_role = role['AssumeRolePolicyDocument']
if trusted_relationships:
trusted_relationships = {
"Version": "2012-10-17",
"Statement": updated_role.get('Statement', [])
}
else:
trusted_relationships = IAMConnection.empty_trusted_relationships()
statement = trusted_relationships['Statement']
if allowed_account:
trusted_accounts = IAMConnection.set_allowed_account(
allowed_account, external_id, 'update')
statement.append(trusted_accounts)
if allowed_service:
trusted_services = IAMConnection.set_allowed_service(
allowed_service, 'update')
statement.append(trusted_services)
if isinstance(trusted_relationships, dict):
trusted_relationships = dumps(trusted_relationships)
statement = updated_role.get('Statement', [])
statement.append(trusted_relationships)
unique = []
for s in statement:
if s not in unique:
unique.append(s)
try:
role = self.client.update_assume_role_policy(
RoleName=role_name,
PolicyDocument=trusted_relationships)
return role['ResponseMetadata']
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchEntityException':
_LOG.warn(f'Can not update role \'{role_name}\': role does '
f'not exist.')
raise e
@staticmethod
def set_allowed_account(allowed_account, external_id, action):
if isinstance(allowed_account, str):
principal = get_account_role_arn(allowed_account)
elif isinstance(allowed_account, list):
principal = []
for each in allowed_account:
principal.append(get_account_role_arn(each))
else:
raise TypeError(
f'Can not {action} role. \'allowed_account\' must be list '
f'or string. Actual type: {type(allowed_account)}')
trusted_accounts = {
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": principal
},
"Action": "sts:AssumeRole"
}
if external_id:
trusted_accounts['Condition'] = {
"StringEquals": {
"sts:ExternalId": external_id
}
}
return trusted_accounts
@staticmethod
def set_allowed_service(allowed_service, action):
if isinstance(allowed_service, str):
principal = "{0}.amazonaws.com".format(allowed_service)
elif isinstance(allowed_service, list):
principal = []
for each in allowed_service:
principal.append("{0}.amazonaws.com".format(each))
else:
raise TypeError(
f'Can not {action} role. \'allowed_service\' must be list '
f'or string. Actual type: {type(allowed_service)}')
trusted_services = {
"Effect": "Allow",
"Principal": {
"Service": principal
},
"Action": "sts:AssumeRole"
}
return trusted_services
def update_custom_policy_content(self, name, arn, content):
policy_resource = self.resource.Policy(arn)
policy_json = policy_resource.default_version.document
statement = content.get('Statement')
if not statement:
_LOG.warn(f'Policy \'{name}\' has no or empty \'Statement\' '
f'field.')
statement = []
policy_json['Statement'] = statement
version = content.get('Version')
if not statement:
_LOG.warn(f'Policy \'{name}\' has no or empty \'Version\' '
f'field.')
version = '2012-10-17'
policy_json['Version'] = version
policy = self.get_policy(arn=arn)
policy_version = self.client.get_policy_version(
PolicyArn=arn, VersionId=policy['DefaultVersionId']
)['PolicyVersion']
if content == policy_version['Document']:
_LOG.warn(f'No need to update policy \'{name}\': the new and the '
f'old contents are identical.')
return
versions = self.client.list_policy_versions(PolicyArn=arn)["Versions"]
to_remove = next((v for v in reversed(versions) if v['IsDefaultVersion'] == False), None)
if to_remove:
_LOG.info(f'Old version of policy is found. Removing one: {to_remove}')
self.remove_policy_version(policy_arn=arn,
version_id=to_remove['VersionId'])
self.create_policy_version(policy_arn=arn,
policy_document=dumps(policy_json),
set_as_default=True)
def create_group(self, name):
return self.client.create_group(GroupName=name)
def get_group(self, name):
groups = []
try:
response = self.client.get_group(GroupName=name)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchEntityException':
_LOG.warn(f'Group {name} is not found')
return []
raise e
token = response.get('Marker')
group_item = response.get('Group')
group_item.update({'Users': response.get('Users')})
groups.extend(group_item)
while token:
response = self.client.get_group(GroupName=name, Marker=token)
token = response.get('Marker')
group_item = response.get('Group')
group_item.update({'Users': response.get('Users')})
groups.extend(group_item)
return groups
def add_user_to_group(self, group_name, username):
try:
response = self.client.add_user_to_group(GroupName=group_name,
UserName=username)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchEntityException':
_LOG.warn(f'Group {group_name} or username {username} is not '
f'found')
return []
raise e
def remove_user_from_group(self, group_name, username):
try:
response = self.client.remove_user_from_group(GroupName=group_name,
UserName=username)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchEntityException':
_LOG.warn(f'Group {group_name} or username {username} is not '
f'found')
raise e
def attach_group_policy(self, group_name, arn):
try:
response = self.client.attach_group_policy(GroupName=group_name,
PolicyArn=arn)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchEntityException':
_LOG.warn(f'Group {group_name} is not found')
elif e.response['Error']['Code'] == 'LimitExceededException':
_LOG.warn(f'Can not attach more than 10 rules to group '
f'{group_name}')
raise e
def get_waiter(self, waiter_name):
return self.client.get_waiter(waiter_name)