in modular_api_cli/modular_handler/user_handler.py [0:0]
def manage_user_groups_handler(self, username, groups, action) -> \
CommandResponse:
"""
Add or remove user to the group(s)
:param username: Username
:param groups: Group name(s) that will be attached or detached to user
:param action: attach or detach group
:return: CommandResponse
"""
_LOG.info(f'Going to {action} user \'{username}\' in group(s)')
groups = list(set(groups))
existing_user = self.user_service.describe_user(username=username)
if not existing_user:
_LOG.error('User does not exists')
raise ModularApiConfigurationException(
f'User \'{username}\' does not exist. Please check spelling')
if existing_user.state != ACTIVATED_STATE:
_LOG.error('User blocked or deleted')
raise ModularApiBadRequestException(
f'User {username} is blocked or deleted{line_sep}To get more '
f'detailed information please execute command:{line_sep}'
f'modular user describe --username {username}')
if self.user_service.calculate_user_hash(existing_user) != \
existing_user.hash:
click.confirm(
f'User \'{username}\' is compromised. Command execution leads '
f'to user entity hash sum recalculation. Are you sure?',
abort=True)
requested_group_items = self.group_service.get_groups_by_name(
group_names=groups)
if not requested_group_items:
raise ModularApiBadRequestException(
f'One or all from requested group(s) does not exist. '
f'Group(s):{line_sep}{groups}')
if len(groups) != len(requested_group_items):
retrieved_group_names = [group.group_name
for group in requested_group_items]
not_existed_policies = [group for group in groups
if group not in retrieved_group_names]
if not_existed_policies:
raise ModularApiBadRequestException(
f'The following groups does not exists: '
f'{", ".join(not_existed_policies)}')
invalid_groups = []
for group in requested_group_items:
if self.group_service.calculate_group_hash(group) != group.hash \
and group.state != ACTIVATED_STATE:
invalid_groups.append(group.group_name)
if invalid_groups:
raise ModularApiBadRequestException(
f'The following group(s) compromised or deleted:{line_sep}'
f'{", ".join(invalid_groups)}.{line_sep}To get more detailed'
f' information please execute command:{line_sep}'
f'modular group describe')
warnings_list = []
user_groups = existing_user.groups
if action == 'add':
existed_group_in_user = set(groups).intersection(user_groups)
if existed_group_in_user:
warnings_list.append(
f'User already attached to the following groups: '
f'{", ".join(existed_group_in_user)}')
existing_user.groups = list(set(user_groups).union(set(groups)))
elif action == 'remove':
not_existed_group_in_user = set(groups).difference(user_groups)
if not_existed_group_in_user:
warnings_list.append(
f'The following groups does not attached to the user: '
f'{", ".join(not_existed_group_in_user)}')
existing_user.groups = list(set(user_groups) - set(groups))
else:
raise ModularApiBadRequestException('Invalid action requested')
existing_user.last_modification_date = utc_time_now().isoformat()
user_hash_sum = self.user_service.calculate_user_hash(existing_user)
existing_user.hash = user_hash_sum
self.user_service.save_user(user_item=existing_user)
_LOG.info('User entity updated')
if warnings_list:
_LOG.warning(f'Command executed with following warning(s): '
f'{warnings_list}')
return CommandResponse(
message=f'User \'{username}\' has been updated',
warnings=warnings_list)