in src/services/rbac/access_control_service.py [0:0]
def is_allowed_to_access(self, event: dict,
target_permission: str) -> bool:
user_id = event.get(USER_ID_ATTR)
user = self.user_service.get_user(user_id=user_id)
if not user:
_LOG.debug(f'User with id: {user_id} does not exist')
return False
if isinstance(user, dict):
user = user.get('Username')
if isinstance(user, User):
user = user.user_id
_LOG.debug(f'Checking user permissions '
f'on \'{target_permission}\' action')
role_name = self.user_service.get_user_role_name(user=user)
role = self.iam_service.role_get(role_name=role_name)
user_customer = self.user_service.get_user_customer(user=user)
event[PARAM_USER_CUSTOMER] = user_customer
event_customer = event.get(CUSTOMER_ATTR)
if user_customer != 'admin' and event_customer \
and event_customer != user_customer:
_LOG.warning(f'User \'{user_id}\' is not authorized to access '
f'\'{event_customer}\' customer.')
return False
if not role:
_LOG.debug(f'Specified role with name: {role_name} does not exist')
return False
if AccessControlService.is_role_expired(role=role):
_LOG.debug(f'Specified role with name: {role_name} is expired')
return False
user_policies = self.iam_service.policy_batch_get(
keys=role.policies)
user_permissions = []
for policy in user_policies:
user_permissions.extend(policy.permissions)
if target_permission in user_permissions:
target_user = event.get(PARAM_TARGET_USER)
if target_user and not AccessControlService.is_allowed_target_user(
role=role, user_id=user_id, target_user=target_user):
return False
_LOG.debug(f'Permission for user \'{user_id}\' on action: '
f'{target_permission} is granted')
return True
return False