src/services/rbac/access_control_service.py (144 lines of code) (raw):

from datetime import datetime, timedelta from typing import Union from commons.constants import USER_ID_ATTR, CUSTOMER_ATTR from commons.log_helper import get_logger from models.policy import Policy from models.role import Role from models.user import User from services.rbac.iam_service import IamService from services.setting_service import SettingsService from services.user_service import CognitoUserService _LOG = get_logger('access-control-service') PARAM_NAME = 'name' PARAM_PERMISSIONS = 'permissions' PARAM_EXPIRATION = 'expiration' PARAM_REQUEST_PATH = 'request_path' PARAM_TARGET_USER = 'target_user' PARAM_USER_CUSTOMER = 'user_customer' class AccessControlService: def __init__(self, iam_service: IamService, user_service: CognitoUserService, setting_service: SettingsService): self.iam_service = iam_service self.user_service = user_service self.setting_service = setting_service def is_allowed_to_access(self, event: dict, target_permission: str) -> bool: user_id = event.get(USER_ID_ATTR) user = self.user_service.get_user(user_id=user_id) if not user: _LOG.debug(f'User with id: {user_id} does not exist') return False if isinstance(user, dict): user = user.get('Username') if isinstance(user, User): user = user.user_id _LOG.debug(f'Checking user permissions ' f'on \'{target_permission}\' action') role_name = self.user_service.get_user_role_name(user=user) role = self.iam_service.role_get(role_name=role_name) user_customer = self.user_service.get_user_customer(user=user) event[PARAM_USER_CUSTOMER] = user_customer event_customer = event.get(CUSTOMER_ATTR) if user_customer != 'admin' and event_customer \ and event_customer != user_customer: _LOG.warning(f'User \'{user_id}\' is not authorized to access ' f'\'{event_customer}\' customer.') return False if not role: _LOG.debug(f'Specified role with name: {role_name} does not exist') return False if AccessControlService.is_role_expired(role=role): _LOG.debug(f'Specified role with name: {role_name} is expired') return False user_policies = self.iam_service.policy_batch_get( keys=role.policies) user_permissions = [] for policy in user_policies: user_permissions.extend(policy.permissions) if target_permission in user_permissions: target_user = event.get(PARAM_TARGET_USER) if target_user and not AccessControlService.is_allowed_target_user( role=role, user_id=user_id, target_user=target_user): return False _LOG.debug(f'Permission for user \'{user_id}\' on action: ' f'{target_permission} is granted') return True return False def get_role(self, name: str): return self.iam_service.role_get(role_name=name) def get_policy(self, name: str): return self.iam_service.policy_get(policy_name=name) def policy_exists(self, name: str) -> bool: # todo return bool(self.get_policy(name=name)) def role_exists(self, name: str) -> bool: # todo return bool(self.get_role(name=name)) @staticmethod def get_role_dto(role: Role): return role.get_json() @staticmethod def get_policy_dto(policy: Policy): return policy.get_json() @staticmethod def delete_entity(entity: Union[Role, Policy]): return entity.delete() def create_policy(self, policy_data: dict): name = policy_data.get(PARAM_NAME) if self.policy_exists(name=name): _LOG.warning(f'Policy with name \'{name}\' already exists') return Policy(**policy_data) def create_role(self, role_data: dict): name = role_data.get(PARAM_NAME) if self.role_exists(name=name): _LOG.warning(f'Role with name \'{name}\' already exists') return Role(**role_data) @staticmethod def save(access_conf_object: Union[Role, Policy]): access_conf_object.save() @staticmethod def is_role_expired(role: Role): role_expiration_datetime = role.expiration if isinstance(role_expiration_datetime, str): role_expiration_datetime = datetime.fromisoformat( role_expiration_datetime) now = datetime.now() return now >= role_expiration_datetime def get_non_existing_permissions(self, permissions: list): permissions_mapping = self.setting_service.get_iam_permissions() nonexistent = [] for permission in permissions: args = permission.split(':') if len(args) != 3: # permission don't match SERVICE:PERMISSION_GROUP:ACTION format nonexistent.append(permission) continue service, permission_group, action = args group = f'{service}:{permission_group}' if action not in permissions_mapping.get(group, []): nonexistent.append(permission) return nonexistent def get_non_existing_policies(self, policies: list): nonexistent = [] for policy in policies: if not self.policy_exists(name=policy): nonexistent.append(policy) return nonexistent @staticmethod def is_allowed_target_user(role, user_id, target_user): resource = role.resource if resource and resource == '*': return True if user_id == target_user: return True return False def get_admin_permissions(self): permission_groups_mapping = self.setting_service.get_iam_permissions() permissions_list = [] for group, available_actions in permission_groups_mapping.items(): permissions_list.extend( [f'{group}:{action}' for action in available_actions]) return permissions_list @staticmethod def get_role_default_expiration(): current = datetime.now() expiration = current + timedelta(3 * 30) return expiration.isoformat()