modular_api_cli/modular_handler/user_handler.py (761 lines of code) (raw):

import json import os import click from modular_api.helpers.log_helper import get_logger from modular_api.helpers.request_processor import generate_route_meta_mapping from modular_api.helpers.utilities import validate_meta_keys from modular_api.web_service.iam import filter_commands_by_permissions from modular_api.helpers.constants import ( BLOCKED_STATE, REMOVED_STATE, ACTIVATED_STATE, ALLOWED_VALUES, AUX_DATA, MODULAR_USER_META_TYPES, ) from modular_api.services.group_service import GroupService from modular_api.services.permissions_cache_service import ( permissions_handler_instance, PermissionsService, ) from modular_api.services.policy_service import PolicyService from modular_api.helpers.date_utils import ( convert_datetime_to_human_readable, utc_time_now, ) from modular_api.helpers.password_util import ( generate_password, validate_password, secure_string, ) from modular_api.services.user_service import UserService from modular_api.helpers.decorators import CommandResponse from modular_api.helpers.exceptions import ( ModularApiConfigurationException, ModularApiBadRequestException, ModularApiConflictException, ModularApiUnauthorizedException, ) SET_META_CMD = ('modular user set_meta_attribute --meta_type allowed_values|' 'aux_data --key $parameter_name --value $parameter_value') line_sep = os.linesep _LOG = get_logger(__name__) class UserHandler: def __init__(self, user_service, group_service, policy_service): self.user_service: UserService = user_service self.group_service: GroupService = group_service self.policy_service: PolicyService = policy_service def add_user_handler( self, username: str, groups: str, password: str, ) -> CommandResponse: """ Adds user to ModularUser table :param username: Username that will be set to the user :param groups: Group name(s) that will be attached to user :param password: User password :return: CommandResponse """ _LOG.info(f'Going to add user \'{username}\' to groups {groups}') existing_user = self.user_service.describe_user(username=username) if existing_user: if existing_user.state == BLOCKED_STATE: _LOG.error('User already exists but blocked') raise ModularApiBadRequestException( f'User \'{username}\' is blocked. To unblock user please ' f'execute command:{line_sep}' f'modular user unblock --user {username} --reason ' f'<$THE USER UNBLOCKING REASON>') elif existing_user.state == REMOVED_STATE: _LOG.error('User already exists but marked as removed') raise ModularApiBadRequestException( f'User \'{username}\' already exists and marked as ' f'\'{REMOVED_STATE}\'') elif existing_user.state == ACTIVATED_STATE: _LOG.error('User already exists') raise ModularApiBadRequestException( f'User \'{username}\' already activated.{line_sep}') _LOG.error('User already exists') raise ModularApiConflictException( f'User \'{username}\' already exists with invalid ' f'\'{existing_user.state}\' state') existing_groups = self.group_service.get_groups_by_name( group_names=groups ) if not existing_groups: _LOG.error('Can not find all or one from provided groups') raise ModularApiBadRequestException( f'One or all provided \'{", ".join(groups)}\' group(s) does not ' f'exist.{line_sep}To add group please execute ' f'\'modular group add --help\' command first') skipped_groups = list() for group in groups: if group not in [ group_name.group_name for group_name in existing_groups ]: skipped_groups.append(group) if skipped_groups: _LOG.error('Missing group detected') raise ModularApiBadRequestException( f'Group(s) you are trying to add to the user is ' f'missing:{line_sep}{", ".join(skipped_groups)}{line_sep}' f'Please remove it`s name(s) from command or add this ' f'group(s) first' ) invalid_groups = [] for group in existing_groups: if self.group_service.calculate_group_hash(group) != group.hash or \ group.state == REMOVED_STATE: invalid_groups.append(group.group_name) if invalid_groups: raise ModularApiBadRequestException( f'The following group(s) are invalid:{line_sep}' f'{", ".join(invalid_groups)}' f'{line_sep}To get more detailed information please execute' f'command:{line_sep}modular group describe') if password: user_password = validate_password(password) is_autogenerated = False else: user_password = generate_password() is_autogenerated = True user_item = self.user_service.create_user_entity( username=username, password=secure_string(user_password), group=groups ) user_hash_sum = self.user_service.calculate_user_hash(user_item) user_item.hash = user_hash_sum self.user_service.save_user(user_item=user_item) if is_autogenerated: autogenerated_message = (f'Autogenerated password: ' f'{user_password}') click.echo(autogenerated_message) click.echo("PAY ATTENTION: You can get the user password only when " "you add the new user. You cannot retrieve it later. " "If you lose it, you must create a new user or change" "password via 'modular user change_password' command") _LOG.info('User successfully activated') return CommandResponse( message=f'User \'{username}\' has been successfully activated.' f'{line_sep}User added to the following group(s):' f'{line_sep}{", ".join(groups)}') def delete_user_handler(self, username) -> CommandResponse: """ Deletes user from ModularUser table :param username: Username that will be deleted from white list :return: CommandResponse """ _LOG.info(f'Going to delete user \'{username}\'') existing_user = self.user_service.describe_user(username=username) if not existing_user: _LOG.error('User does not exist') raise ModularApiConfigurationException( f'User \'{username}\' does not exist. Nothing to delete') if existing_user.state != ACTIVATED_STATE: _LOG.error('User already marked as deleted or blocked') raise ModularApiBadRequestException( f'User \'{username}\' is blocked or deleted.{line_sep}To get ' f'more detailed information please execute command:{line_sep}' f'modular user describe --username {username}') if self.user_service.calculate_user_hash(existing_user) != \ existing_user.hash: click.confirm( f'User \'{username}\' is compromised. Command execution ' f'leads to user entity hash sum recalculation. Are you sure?', abort=True) existing_user.state = REMOVED_STATE existing_user.last_modification_date = utc_time_now().isoformat() user_hash_sum = self.user_service.calculate_user_hash(existing_user) existing_user.hash = user_hash_sum self.user_service.save_user(user_item=existing_user) _LOG.info('User successfully deleted') return CommandResponse( message=f'User {username} has been successfully deleted') def block_user_handler(self, username, reason) -> CommandResponse: """ Change user entity state to block :param username: Username :param reason: The textual reason of user removal :return: CommandResponse """ _LOG.info(f'Going to block user \'{username}\' due to {reason}') existed_user = self.user_service.describe_user(username=username) if not existed_user: _LOG.error('User does not exist') raise ModularApiConfigurationException( f'User \'{username}\' does not exist. Can not block') if self.user_service.calculate_user_hash(existed_user) != \ existed_user.hash: click.confirm( f'User \'{username}\' is compromised. Command execution leads ' f'to user entity hash sum recalculation. Are you sure?', abort=True) existed_user.state = BLOCKED_STATE existed_user.state_reason = reason existed_user.last_modification_date = utc_time_now().isoformat() user_hash_sum = self.user_service.calculate_user_hash(existed_user) existed_user.hash = user_hash_sum self.user_service.save_user(user_item=existed_user) _LOG.info('User successfully blocked') return CommandResponse( message=f'User \'{username}\' has been successfully blocked') def unblock_user_handler(self, username, reason) -> CommandResponse: """ Change user entity state to unblock :param username: Username :param reason: The textual reason of user returning to whitelist :return: CommandResponse """ _LOG.info(f'Going to unblock user \'{username}\' due to {reason}') existed_user = self.user_service.describe_user(username=username) if not existed_user: _LOG.error('User does not exist') raise ModularApiConfigurationException( f'User \'{username}\' does not exist. Can not unblock') if self.user_service.calculate_user_hash(existed_user) != \ existed_user.hash: click.confirm( f'User \'{username}\' compromised. Command execution leads ' f'to user entity hash sum recalculation. Are you sure?', abort=True) existed_user.state = ACTIVATED_STATE existed_user.state_reason = reason existed_user.last_modification_date = utc_time_now().isoformat() user_hash_sum = self.user_service.calculate_user_hash(existed_user) existed_user.hash = user_hash_sum self.user_service.save_user(user_item=existed_user) _LOG.info('User successfully unblocked') return CommandResponse( message=f'User \'{username}\' has been successfully unblocked') def change_user_password_handler( self, username: str, password: str, ) -> CommandResponse: """ Change user password to administrator defined :param username: Username :param password: New user password :return: CommandResponse """ _LOG.info(f'Going to change password for user \'{username}\'') existing_user = self.user_service.describe_user(username=username) if not existing_user: _LOG.error('User does not exists') raise ModularApiConfigurationException( f'User \'{username}\' does not exists. Can not change password') if existing_user.state != ACTIVATED_STATE: _LOG.error('User blocked or deleted') raise ModularApiBadRequestException( f'User \'{username}\' is blocked or deleted.{line_sep}To get ' f'more detailed information please execute command:{line_sep}' f'modular user describe --username {username}') if self.user_service.calculate_user_hash(existing_user) != \ existing_user.hash: click.confirm( f'User \'{username}\' is compromised. Command execution leads ' f'to user entity hash sum recalculation. Are you sure?', abort=True) existing_user.password = secure_string( string_to_secure=validate_password(password=password) ) existing_user.last_modification_date = utc_time_now().isoformat() user_hash_sum = self.user_service.calculate_user_hash(existing_user) existing_user.hash = user_hash_sum self.user_service.save_user(user_item=existing_user) _LOG.info('Password successfully updated') return CommandResponse( message=f'User \'{username}\' password has been updated') def change_user_name_handler(self, old_username, new_username) \ -> CommandResponse: _LOG.info(f'Going to change username for user \'{old_username}\'') existing_user = self.user_service.describe_user(username=old_username) if not existing_user: _LOG.error('User does not exists') raise ModularApiConfigurationException( f'User \'{old_username}\' does not exists. ' f'Can not change username') if existing_user.state != ACTIVATED_STATE: _LOG.error('User blocked or deleted') raise ModularApiBadRequestException( f'User \'{old_username}\' is blocked or deleted.' f'{line_sep}To get more detailed information ' f'please execute command:{line_sep}' f'modular user describe --username {old_username}') if self.user_service.calculate_user_hash(existing_user) != \ existing_user.hash: click.confirm( f'User \'{old_username}\' is compromised. ' f'Command execution leads to user entity hash ' f'sum recalculation. Are you sure?', abort=True) existing_user.last_modification_date = utc_time_now().isoformat() existing_user.username = new_username self.user_service.save_user_with_recalculated_hash( user_item=existing_user ) _LOG.debug(f'New user with username: {new_username} created.') _LOG.debug(f'Deleting old user {old_username}') existing_user.username = old_username self.user_service.delete_user(user_item=existing_user) _LOG.info('Username successfully updated') return CommandResponse( message=f'User \'{old_username}\' name has been ' f'updated to {new_username}') def manage_user_groups_handler(self, username, groups, action) -> \ CommandResponse: """ Add or remove user to the group(s) :param username: Username :param groups: Group name(s) that will be attached or detached to user :param action: attach or detach group :return: CommandResponse """ _LOG.info(f'Going to {action} user \'{username}\' in group(s)') groups = list(set(groups)) existing_user = self.user_service.describe_user(username=username) if not existing_user: _LOG.error('User does not exists') raise ModularApiConfigurationException( f'User \'{username}\' does not exist. Please check spelling') if existing_user.state != ACTIVATED_STATE: _LOG.error('User blocked or deleted') raise ModularApiBadRequestException( f'User {username} is blocked or deleted{line_sep}To get more ' f'detailed information please execute command:{line_sep}' f'modular user describe --username {username}') if self.user_service.calculate_user_hash(existing_user) != \ existing_user.hash: click.confirm( f'User \'{username}\' is compromised. Command execution leads ' f'to user entity hash sum recalculation. Are you sure?', abort=True) requested_group_items = self.group_service.get_groups_by_name( group_names=groups) if not requested_group_items: raise ModularApiBadRequestException( f'One or all from requested group(s) does not exist. ' f'Group(s):{line_sep}{groups}') if len(groups) != len(requested_group_items): retrieved_group_names = [group.group_name for group in requested_group_items] not_existed_policies = [group for group in groups if group not in retrieved_group_names] if not_existed_policies: raise ModularApiBadRequestException( f'The following groups does not exists: ' f'{", ".join(not_existed_policies)}') invalid_groups = [] for group in requested_group_items: if self.group_service.calculate_group_hash(group) != group.hash \ and group.state != ACTIVATED_STATE: invalid_groups.append(group.group_name) if invalid_groups: raise ModularApiBadRequestException( f'The following group(s) compromised or deleted:{line_sep}' f'{", ".join(invalid_groups)}.{line_sep}To get more detailed' f' information please execute command:{line_sep}' f'modular group describe') warnings_list = [] user_groups = existing_user.groups if action == 'add': existed_group_in_user = set(groups).intersection(user_groups) if existed_group_in_user: warnings_list.append( f'User already attached to the following groups: ' f'{", ".join(existed_group_in_user)}') existing_user.groups = list(set(user_groups).union(set(groups))) elif action == 'remove': not_existed_group_in_user = set(groups).difference(user_groups) if not_existed_group_in_user: warnings_list.append( f'The following groups does not attached to the user: ' f'{", ".join(not_existed_group_in_user)}') existing_user.groups = list(set(user_groups) - set(groups)) else: raise ModularApiBadRequestException('Invalid action requested') existing_user.last_modification_date = utc_time_now().isoformat() user_hash_sum = self.user_service.calculate_user_hash(existing_user) existing_user.hash = user_hash_sum self.user_service.save_user(user_item=existing_user) _LOG.info('User entity updated') if warnings_list: _LOG.warning(f'Command executed with following warning(s): ' f'{warnings_list}') return CommandResponse( message=f'User \'{username}\' has been updated', warnings=warnings_list) def resolve_policy_allowed_actions(self, policies): actions = [] errors = [] for policy in policies: received_policy = self.policy_service.describe_policy( policy_name=policy) if not received_policy: errors.append(f'Policy \'{policy}\' does not exist') continue if received_policy.hash != self.policy_service.calculate_policy_hash( policy_item=received_policy): errors.append( f'Policy \'{policy}\' is compromised. Calculated hash sum is ' f'different from existed in entity.') actions.extend(received_policy.content) return actions, errors def validate_groups_policies_and_resolve_actions(self, groups): errors_list = [] actions_list = [] for group in groups: received_group = self.group_service.describe_group( group_name=group) if not received_group: errors_list.append(f'Group \'{group}\' does not exist') continue if received_group.hash != self.group_service.calculate_group_hash( group_item=received_group): errors_list.append( f'Group {group} is compromised. Calculated hash sum is ' f'different from existed in entity.') actions, errors = self.resolve_policy_allowed_actions( policies=received_group.policies ) errors_list.extend(errors) actions_list.extend(actions) return errors_list, actions_list @staticmethod def prettify_user_item(user, user_compromised, policy_group_compromised=False): is_compromised = any([user_compromised, policy_group_compromised]) return { 'Username': user.username, 'Groups': user.groups, 'State': user.state, 'State reason': user.state_reason, 'User meta': user.meta.as_dict() if user.meta else None, 'Modification date': convert_datetime_to_human_readable( datetime_object=user.last_modification_date ), 'Creation Date': convert_datetime_to_human_readable( datetime_object=user.creation_date ), 'Consistency status': 'Compromised' if is_compromised else 'OK' } def describe_single_user(self, username): existed_user = self.user_service.describe_user(username=username) if not existed_user: raise ModularApiBadRequestException(f'No such user: \'{username}\'') policies_groups_warnings, _ = \ self.validate_groups_policies_and_resolve_actions( groups=existed_user.groups ) user_compromised = self.user_service.calculate_user_hash( user_item=existed_user) != existed_user.hash is_policies_groups_compromised = True if policies_groups_warnings else False pretty_user = self.prettify_user_item( user=existed_user, user_compromised=user_compromised, policy_group_compromised=is_policies_groups_compromised ) return CommandResponse( table_title='User description', warnings=policies_groups_warnings, items=[pretty_user]) def policy_simulator_handler(self, user_name, user_group, policy_name, requested_command): warnings_list = [] policies = [] item_name = '' item = '' if user_name: existed_user = self.user_service.describe_user( username=user_name) if not existed_user: raise ModularApiBadRequestException( f'No such user: \'{user_name}\'') warnings_list, policies = \ self.validate_groups_policies_and_resolve_actions( groups=existed_user.groups ) item, item_name = 'user', user_name elif user_group: warnings_list, policies = \ self.validate_groups_policies_and_resolve_actions( groups=[user_group] ) item, item_name = 'group', user_group elif policy_name: policies, warnings_list = self.resolve_policy_allowed_actions( policies=[policy_name] ) item, item_name = 'policy', policy_name if warnings_list: return CommandResponse( message='Any action can not be performed due to ' 'user/group/policy invalid configuration', warnings=warnings_list ) if not requested_command.startswith('admin '): raise ModularApiBadRequestException( 'Incorrect spelling. All commands should starting ' 'with "admin ..."' ) if '-' in requested_command: raise ModularApiBadRequestException( 'Incorrect spelling. "-" and "--" symbols are not allowed. ' 'Check spelling' ) available_commands = permissions_handler_instance().available_commands filtered_commands = filter_commands_by_permissions(available_commands, policies) route_mapping = generate_route_meta_mapping( commands_meta=filtered_commands) init_cmd = requested_command requested_command = requested_command.split() path = '/'.join(requested_command).replace('admin', '') # for admin root commands # if will be added new one -> add to list: if path == '/get_operation_status': path = '/admin/get_operation_status' # ---------------------------------------- effect = 'ALLOW' if path in route_mapping.keys() else 'DENY' return CommandResponse( f'Checked for {item}: {item_name}{os.linesep}' f'Command: {init_cmd}{os.linesep}' f'Status: {effect}{os.linesep}' ) def describe_user_handler( self, username: str | None = None, ) -> CommandResponse: """ Describes user entity from User model or list all existed users :param username: Username that will be deleted from white list :return: CommandResponse """ _LOG.info(f'Going to describe user \'{username}\'') if username: return self.describe_single_user(username=username) existed_users = tuple(self.user_service.scan_users()) if not existed_users: raise ModularApiBadRequestException( 'User(s) does not exist. To add user please execute ' '\'modular user add\' command') pretty_users = [] invalid = 0 for user in existed_users: is_compromised = self.user_service.calculate_user_hash( user_item=user) != user.hash if is_compromised: invalid += 1 pretty_user_item = self.prettify_user_item( user=user, user_compromised=is_compromised ) pretty_users.append(pretty_user_item) valid_title = 'User(s) description' compromised_title = f'User(s) description{os.linesep}WARNING! ' \ f'{invalid} compromised users have been detected.' return CommandResponse( table_title=compromised_title if invalid else valid_title, items=pretty_users, ) def set_user_meta_handler( self, username: str, meta_type: str, key: str, values: tuple | None = None, value_as_json: str | None = None, ) -> CommandResponse: if (not values) == (not value_as_json): raise ModularApiBadRequestException( "Please provide either '--values' or '--value_as_json'" ) try: data_to_store = json.loads(value_as_json) \ if value_as_json else list(values) except json.JSONDecodeError: raise ModularApiBadRequestException( "The provided JSON string could not be decoded. " "Please check the format and ensure it is a valid JSON" ) if meta_type not in (ALLOWED_VALUES, AUX_DATA): raise ModularApiBadRequestException( f"Unsupported type provided: {meta_type}" ) _allowed_values = meta_type == ALLOWED_VALUES _valid_allowed_values = ( isinstance(data_to_store, list) and all(isinstance(i, str) for i in data_to_store) ) _aux_data = meta_type == AUX_DATA _valid_aux_data = ( isinstance(data_to_store, dict) and all(isinstance(i, str) for i in data_to_store.keys()) and all(isinstance(i, str) for i in data_to_store.values()) ) if _allowed_values and not _valid_allowed_values or _aux_data and not _valid_aux_data: raise ModularApiBadRequestException( f"The data structure provided does not match the expected " f"format for type: {meta_type}" ) _LOG.info( f"Going to set meta with type '{meta_type}' for the user '{username}'" ) user = self.check_existence_and_get_user(username) self.check_user_validness(user) user.meta = user.meta or {} if meta_type not in user.meta: user.meta[meta_type] = {} match meta_type: case "allowed_values": validate_meta_keys(key) action = 'replaced' if key in user.meta[meta_type] else 'set' user.meta[meta_type][key] = data_to_store case "aux_data": action = 'replaced' if key in user.meta[meta_type] else 'set' user.meta[meta_type][key] = data_to_store case _: raise ModularApiBadRequestException( f"Invalid meta_type. Should be either {ALLOWED_VALUES} or " f"{AUX_DATA}" ) user.last_modification_date = utc_time_now().isoformat() self.user_service.save_user_with_recalculated_hash(user) log_and_return_message = ( f"The '{meta_type}' in the 'meta' data for the user '{username}' " f"has been '{action}' successfully." ) _LOG.info(log_and_return_message) return CommandResponse(message=log_and_return_message) def update_user_meta_handler( self, username: str, meta_type: str, key: str, values: tuple | None = None, value_as_json: str | None = None, ) -> CommandResponse: if (not values) == (not value_as_json): raise ModularApiBadRequestException( "Please provide either '--values' or '--value_as_json'" ) try: data_to_store = json.loads(value_as_json) \ if value_as_json else list(values) except json.JSONDecodeError: raise ModularApiBadRequestException( "The provided JSON string could not be decoded. " "Please check the format and ensure it is a valid JSON" ) if meta_type not in (ALLOWED_VALUES, AUX_DATA): raise ModularApiBadRequestException( f"Unsupported type provided: {meta_type}" ) _allowed_values = meta_type == ALLOWED_VALUES _valid_allowed_values = ( isinstance(data_to_store, list) and all(isinstance(i, str) for i in data_to_store) ) _aux_data = meta_type == AUX_DATA _valid_aux_data = ( isinstance(data_to_store, dict) and all(isinstance(i, str) for i in data_to_store.keys()) and all(isinstance(i, str) for i in data_to_store.values()) ) if _allowed_values and not _valid_allowed_values or _aux_data and not _valid_aux_data: raise ModularApiBadRequestException( f"The data structure provided does not match the expected " f"format for type: {meta_type}" ) _LOG.info(f'Going to update user \'{username}\' meta') user = self.check_existence_and_get_user(username) self.check_user_validness(user) if not user.meta or not user.meta.as_dict().get(meta_type): _LOG.error('There is no meta to update') raise ModularApiBadRequestException( f"User '{username}' has no meta or it is empty, nothing to " f"update.{os.linesep}Please set meta first with the command " f"'{SET_META_CMD}'" ) if key not in user.meta.as_dict().get(meta_type).keys(): _LOG.error(f'Invalid key \'{key}\' for meta updating') raise ModularApiBadRequestException( f"User '{username}' has no parameter name '{key}' in meta." f"{meta_type}' or it is empty, nothing to update.{os.linesep}" f"Please set meta first with the command {SET_META_CMD}'" ) match meta_type: case "allowed_values": validate_meta_keys(key) values_list = user.meta[meta_type].get(key) user.meta[meta_type][key] = \ list(set(values_list) | set(data_to_store)) case "aux_data": values_dict = user.meta[meta_type].get(key) user.meta[meta_type][key] = values_dict | data_to_store case _: raise ModularApiBadRequestException( f"Invalid meta_type. Should be either {ALLOWED_VALUES} or " f"{AUX_DATA}" ) user.last_modification_date = utc_time_now().isoformat() self.user_service.save_user_with_recalculated_hash(user) _LOG.error('User meta successfully updated') return CommandResponse( message=f'Meta information for user \'{username}\' has been ' f'successfully updated' ) def delete_user_meta_handler( self, username: str, keys: tuple, meta_type: str, ) -> CommandResponse: _LOG.info(f'Going to delete meta from the user \'{username}\'') user = self.check_existence_and_get_user(username) self.check_user_validness(user) if not user.meta or not user.meta.as_dict().get(meta_type): _LOG.error('There is no meta to delete') raise ModularApiBadRequestException( f"User '{username}' has no meta or it is empty, nothing to " f"delete.{os.linesep}Please set meta first with the command " f"'{SET_META_CMD}'" ) skipped_keys = [] removed_keys = [] for key in keys: if key not in user.meta[meta_type]: skipped_keys.append(key) continue user.meta[meta_type].pop(key) removed_keys.append(key) if removed_keys: user.last_modification_date = utc_time_now().isoformat() self.user_service.save_user_with_recalculated_hash(user) message = f'Deleted parameter(s) name from \'{username}\' meta: ' \ f'{removed_keys}' else: message = f'No parameter(s) name has been removed from ' \ f'\'{username}\' meta' if skipped_keys: message += f'{os.linesep}Next parameter(s) name deletion skipped ' \ f'due to its absence in user meta: {skipped_keys}' _LOG.info(f'Keys {keys} successfully deleted from user meta') return CommandResponse(message=message) def reset_user_meta_handler(self, username): _LOG.info(f'Going to delete meta from the user \'{username}\'') user = self.check_existence_and_get_user(username) self.check_user_validness(user) if not user.meta or not user.meta.as_dict(): _LOG.error('User has no meta, nothing to delete') raise ModularApiBadRequestException( f'User \'{username}\' has no meta, nothing to reset.' f'{os.linesep}Please set meta first with the command ' f'\'{SET_META_CMD}\'' ) user.meta = {} user.last_modification_date = utc_time_now().isoformat() self.user_service.save_user_with_recalculated_hash(user) _LOG.info('All user meta successfully deleted') return CommandResponse( message=f'All data in user \'{username}\' meta has been deleted' ) def describe_user_meta_handler( self, username: str, ) -> CommandResponse: _LOG.info(f'Going to describe \'{username}\' user meta') user = self.check_existence_and_get_user(username) items = [] meta_values = user.meta.as_dict() if user.meta else {} if not meta_values or not ( meta_values.get(ALLOWED_VALUES) or meta_values.get(AUX_DATA) ): return CommandResponse( message=f'There is nothing to describe. The user \'{username}\'' f' has no `meta` or it is empty.' ) for section in MODULAR_USER_META_TYPES: section_data = user.meta.as_dict().get(section) if section_data is None: continue for item, values in section_data.items(): items.append( { "Type of meta": section, "Parameter name": item, "Parameter values": values, } ) _LOG.info('User meta successfully described') return CommandResponse( table_title=f"Meta information for user \'{username}\'", items=items ) def check_user_validness(self, user): try: PermissionsService( user_service=self.user_service, group_service=self.group_service, policy_service=self.policy_service ).check_user_item_is_valid(user) except ModularApiUnauthorizedException: raise ModularApiBadRequestException( 'The User item you are trying to modify is compromised. ' 'Can not perform command execution' ) def check_existence_and_get_user(self, username): user = self.user_service.describe_user(username=username) if not user: raise ModularApiBadRequestException( f'User with name \'{username}\' does not exist' ) return user