modular_api_cli/modular_handler/group_handler.py (310 lines of code) (raw):

import json import os import click from modular_api.helpers.log_helper import get_logger from modular_api.models.user_model import User from modular_api.helpers.constants import REMOVED_STATE, ACTIVATED_STATE from modular_api.services.group_service import GroupService from modular_api.services.policy_service import PolicyService from modular_api.services.user_service import UserService from modular_api.helpers.date_utils import ( convert_datetime_to_human_readable, utc_time_now, ) from modular_api.helpers.decorators import CommandResponse from modular_api.helpers.exceptions import ( ModularApiBadRequestException, ModularApiConflictException, ) line_sep = os.linesep _LOG = get_logger(__name__) class GroupHandler: def __init__(self, group_service, policy_service, user_service): self.user_service: UserService = user_service self.group_service: GroupService = group_service self.policy_service: PolicyService = policy_service def add_group_handler(self, group: str, policies: list) -> CommandResponse: """ Add group to ModularGroup table :param group: group name :param policies: Policies list which will be attached to group :return: CommandResponse """ _LOG.info(f'Going to add group \'{group}\' with policies \'{policies}\'') policies = list(set(policies)) existed_group = self.group_service.describe_group(group_name=group) if existed_group: if existed_group.state == REMOVED_STATE: _LOG.error('Specified group already exists') raise ModularApiBadRequestException( f'Group with name {group} already exists and marked as ' f'\'{REMOVED_STATE}\'') elif existed_group.state == ACTIVATED_STATE: _LOG.error('Specified group already exists') raise ModularApiBadRequestException( f'Group with name \'{group}\' already exists. Please ' f'change name to another one or configure existing ' f'\'{group}\' group with new policies by command:' f'{line_sep}modular group add_policy --group {group} ' f'--policy $policy_name{line_sep}In case if you need add ' f'several policies at one time use the next syntax:' f'{line_sep}modular group add_policy --group {group} ' f'--policy $policy_name_1 --policy $policy_name_2 ' f'--policy $policy_name_N') _LOG.error('Specified group already exists') raise ModularApiConflictException( f'Group with name \'{group}\' already exists with invalid ' f'\'{existed_group.state}\' state') existing_policies = self.policy_service.get_policies_by_name( policy_names=policies ) if not existing_policies: _LOG.error('Specified policies does not exist') raise ModularApiBadRequestException( f'Policy(ies) you are trying to add to the group \'{group}\' ' f'does not exist. Please add policy(ies) first via command:' f'{line_sep}modular policy add' ) skipped_policies = list() for policy_name in policies: if policy_name not in [policy.policy_name for policy in existing_policies]: skipped_policies.append(policy_name) if skipped_policies: _LOG.error('Specified policies does not exist') raise ModularApiBadRequestException( f'Policy(ies) you are trying to add to the group is ' f'missing:{line_sep}{", ".join(skipped_policies)}{line_sep}' f'Please remove it`s name(s) from command or add this ' f'policy(ies) first' ) invalid_policies = [] for policy in existing_policies: if self.policy_service.calculate_policy_hash(policy) != policy.hash \ or policy.state != ACTIVATED_STATE: invalid_policies.append(policy.policy_name) if invalid_policies: _LOG.error('Provided policies compromised or deleted') raise ModularApiBadRequestException( f'Provided policies compromised or deleted: ' f'{", ".join(invalid_policies)}.{line_sep}To get more detailed ' f'information please execute command:{line_sep}' f'modular policy describe') group_item = self.group_service.create_group_entity( group_name=group, policies=policies ) group_hash_sum = self.group_service.calculate_group_hash( group_item=group_item ) group_item.hash = group_hash_sum self.group_service.save_group(group_item=group_item) _LOG.info(f'Group with name \'{group}\' successfully added') return CommandResponse( message=f'Group with name \'{group}\' successfully added. Attached ' f'policy(ies): {policies}') def manage_group_policies_handler(self, group: str, policies: list, action: str) -> CommandResponse: """ Adds policies to existed group entity :param group: group name which will be updated :param policies: Policies list which will be attached/detached to/from group :param action: add or remove action :return: CommandResponse """ policies = list(set(policies)) group_item = self.group_service.describe_group(group_name=group) if not group_item: _LOG.error(f'Group with name \'{group}\' does not exist') raise ModularApiBadRequestException( f'Group with name \'{group}\' does not exist. Please check ' f'group name spelling or add group via command:{line_sep}' f'modular group add --group {group} --policy $policy_name_1 ' f'--policy $policy_name_2 --policy $policy_name_N') if group_item.state != ACTIVATED_STATE: _LOG.error(f'Group with name \'{group}\' is blocked or deleted') raise ModularApiBadRequestException( f'Group with name \'{group}\' is blocked or deleted. To get ' f'more detailed information please execute command:{line_sep}' f'modular group describe --group {group}') if self.group_service.calculate_group_hash(group_item) != \ group_item.hash: click.confirm( f'Group with name \'{group}\' is compromised. Command ' f'execution leads to group entity hash sum recalculation. ' f'Are you sure?', abort=True) retrieved_policies = self.policy_service.get_policies_by_name( policy_names=policies ) if not retrieved_policies: not_existed_policy = set(policies).intersection(group_item.policies) if not_existed_policy and action == 'remove': click.confirm( 'Provided policy attached to group, but policy entity ' 'does not exists. Possible reason is ModularPolicy ' f'collection compromised and the following policy entities' f' dropped from DB: {not_existed_policy}. Are you about ' f'group hash recalculation?', abort=True ) for policy in not_existed_policy: group_item.policies.remove(policy) group_hash_sum = self.group_service.calculate_group_hash( group_item) group_item.hash = group_hash_sum self.group_service.save_group(group_item=group_item) return CommandResponse( message='Group item hash successfully recalculated. ' 'Please execute command again' ) raise ModularApiBadRequestException( f'Not existed policy(ies) requested: {policies}') if len(policies) != len(retrieved_policies): retrieved_policy_names = [policy.policy_name for policy in retrieved_policies] not_existed_policies = [policy for policy in policies if policy not in retrieved_policy_names] not_existed_policy = set(not_existed_policies).intersection( group_item.policies) if not_existed_policy and action == 'remove': click.confirm( 'Provided policy attached to group, but policy entity ' 'does not exists. Possible reason is ModularPolicy ' f'collection compromised and the following policy entities' f' dropped from DB: {not_existed_policy}. Are you about ' f'group hash recalculation?', abort=True ) for policy in not_existed_policy: group_item.policies.remove(policy) group_hash_sum = self.group_service.calculate_group_hash( group_item) group_item.hash = group_hash_sum self.group_service.save_group(group_item=group_item) return CommandResponse( message='Group item hash successfully recalculated. ' 'Please execute command again' ) if not_existed_policies: raise ModularApiBadRequestException( f'Provided policies does not exist: ' f'{", ".join(not_existed_policies)}') invalid_policies = [] for policy in retrieved_policies: if self.policy_service.calculate_policy_hash(policy) != policy.hash \ or policy.state != ACTIVATED_STATE: invalid_policies.append(policy.policy_name) if invalid_policies: _LOG.error('Provided policies compromised or deleted') raise ModularApiBadRequestException( f'Provided policies compromised or deleted: ' f'{", ".join(invalid_policies)}{line_sep}To get more detailed' f' information please execute command:{line_sep}' f'modular policy describe') warnings_list = [] existed_policies = group_item.policies if action == 'add': existed_policies_in_group = set(policies).intersection( existed_policies) if existed_policies_in_group: warnings_list.append( f'The following policies already attached to \'{group}\' ' f'group:{line_sep}' f'{", ".join(existed_policies_in_group)}') group_item.policies = list(set(existed_policies).union(set(policies))) elif action == 'remove': not_existed_group_in_user = set(policies).difference(existed_policies) if not_existed_group_in_user: warnings_list.append( f'The following policies does not attached to \'{group}\' ' f'group:{line_sep}{", ".join(not_existed_group_in_user)}') group_item.policies = list(set(existed_policies) - set(policies)) else: raise ModularApiBadRequestException('Invalid action requested') group_item.last_modification_date = utc_time_now().isoformat() group_hash_sum = self.group_service.calculate_group_hash(group_item) group_item.hash = group_hash_sum self.group_service.save_group(group_item=group_item) result = 'added' if action == 'add' else 'deleted' _LOG.info(f'Policies: {", ".join(policies)} successfully {result}. ' f'Updated group: \'{group}\'') return CommandResponse( message=f'Policies: {", ".join(policies)} successfully {result}. ' f'Updated group: \'{group}\'', warnings=warnings_list) def describe_group_handler( self, group: str | None = None, ) -> CommandResponse: """ Describes group content from ModularGroup table for specified group or list all existed groups :param group: Optional. group name which will be described :return: CommandResponse """ _LOG.info(f'Going to describe \'{group}\'') if group: item = self.group_service.describe_group(group) existed_groups = () if not item else (item, ) else: existed_groups = tuple(self.group_service.scan_groups()) if not existed_groups: _LOG.warning('Group(s) does not exist') raise ModularApiBadRequestException( 'Group(s) does not exist. Please check spelling' ) pretty_groups = [] invalid = 0 for group in existed_groups: is_compromised = self.group_service.calculate_group_hash( group_item=group) != group.hash if is_compromised: invalid += 1 pretty_user_item = { 'Group name': group.group_name, 'State': group.state, 'Policy(ies)': group.policies, 'Last modification date': convert_datetime_to_human_readable( datetime_object=group.last_modification_date ), 'Creation date': convert_datetime_to_human_readable( datetime_object=group.creation_date ), 'Consistency status': 'Compromised' if is_compromised else 'OK' } pretty_groups.append(pretty_user_item) valid_title = 'Group(s) description' compromised_title = f'Group(s) description{os.linesep}WARNING! ' \ f'{invalid} compromised group(s) have been detected.' return CommandResponse( table_title=compromised_title if invalid else valid_title, items=pretty_groups, ) def delete_group_handler(self, group: str) -> CommandResponse: """ Delete group from ModularGroup table :param group: Group name to delete :return: CommandResponse """ _LOG.info(f'Going to delete group \'{group}\'') group_item = self.group_service.describe_group(group_name=group) if not group_item: _LOG.error('Group does not exist') raise ModularApiBadRequestException( f'Group with name \'{group}\' does not exist') if group_item.state != ACTIVATED_STATE: _LOG.error(f'Group with name \'{group}\' is blocked or deleted') raise ModularApiBadRequestException( f'Group with name \'{group}\' is blocked or deleted. To get ' f'more detailed information please execute command:{line_sep}' f'modular group describe --group {group}') self._check_group_in_users(group_name=group) if self.group_service.calculate_group_hash(group_item) != \ group_item.hash: click.confirm( f'Group with name \'{group}\' is compromised. Command execution' f' leads to group entity hash sum recalculation. ' f'Are you sure?', abort=True) group_item.state = REMOVED_STATE group_item.last_modification_date = utc_time_now().isoformat() group_hash_sum = self.group_service.calculate_group_hash(group_item) group_item.hash = group_hash_sum self.group_service.save_group(group_item=group_item) message = f'Group with name \'{group}\' successfully deleted' _LOG.info(message) return CommandResponse(message=message) def _check_group_in_users(self, group_name): users_with_specified_group = list(self.user_service.scan_users( filter_condition=User.groups.contains(group_name))) users_to_pay_attention = list() del_commands_list = str() if users_with_specified_group: for user in users_with_specified_group: if user.state != ACTIVATED_STATE: continue users_to_pay_attention.append(user.username) del_commands_list += f'modular user remove_from_group ' \ f'--username {user.username} --group ' \ f'{group_name}{line_sep}' if users_to_pay_attention: raise ModularApiBadRequestException( f'Group with name \'{group_name}\' can not be deleted due to ' f'it attachment to the following activated user(s): ' f'{users_to_pay_attention}.{line_sep}' f'You should remove group \'{group_name}\' from each user in ' f'{users_to_pay_attention} list first.{line_sep}' f'User(s) can be removed from group via command(s):{line_sep}' f'{del_commands_list}')