modular_api_cli/modular_handler/policy_handler.py (188 lines of code) (raw):
import json
import os
import click
from modular_api.helpers.log_helper import get_logger
from modular_api.models.group_model import Group
from modular_api.helpers.constants import REMOVED_STATE, ACTIVATED_STATE
from modular_api.services.group_service import GroupService
from modular_api.services.policy_service import policy_validation, PolicyService
from modular_api.helpers.date_utils import (
convert_datetime_to_human_readable, utc_time_now,
)
from modular_api.helpers.decorators import CommandResponse
from modular_api.helpers.exceptions import (
ModularApiBadRequestException, ModularApiConflictException,
)
from modular_api.helpers.file_helper import open_json_file
line_sep = os.linesep
_LOG = get_logger(__name__)
class PolicyHandler:
def __init__(self, policy_service, group_service):
self.policy_service: PolicyService = policy_service
self.group_service: GroupService = group_service
def add_policy_handler(self, policy: str, policy_path: str) -> CommandResponse:
"""
Add policy to ModularPolicy table
:param policy: Policy name
:param policy_path: Path to file which contains allowed/denied actions
:return: CommandResponse
"""
_LOG.info(f'Going to add policy named \'{policy}\' from \'{policy_path}\'')
existing_policy = self.policy_service.describe_policy(
policy_name=policy
)
if existing_policy:
if existing_policy.state == REMOVED_STATE:
_LOG.error('Policy already exists')
raise ModularApiBadRequestException(
f'Policy with name \'{policy}\' already exists and marked as '
f'\'{REMOVED_STATE}\'.')
elif existing_policy.state == ACTIVATED_STATE:
_LOG.error('Policy already exists')
raise ModularApiBadRequestException(
f'Policy with name \'{policy}\' already exists. Please '
f'change name to another one or update existing '
f'\'{policy}\' policy with new permissions by command:'
f'{line_sep}modular policy update --policy {policy} '
f'--policy_path {policy_path}\'')
_LOG.error('Policy already exists')
raise ModularApiConflictException(
f'Policy with name \'{policy}\' already exists with invalid '
f'\'{existing_policy.state}\' state')
policy_content = open_json_file(file_path=policy_path)
policy_validation(policy_content=policy_content)
policy_item = self.policy_service.create_policy_entity(
policy_name=policy,
policy_content=policy_content
)
policy_hash_sum = self.policy_service.calculate_policy_hash(
policy_item=policy_item
)
policy_item.hash = policy_hash_sum
self.policy_service.save_policy(policy_item=policy_item)
_LOG.info('Policy successfully added')
return CommandResponse(
message=f'Policy with name \'{policy}\' successfully added')
def update_policy_handler(self, policy: str, policy_path: str) -> CommandResponse:
"""
Updates policy content in ModularPolicy table
:param policy: Policy name which will be updated
:param policy_path: Path to file which contains allowed/denied actions
:return: CommandResponse
"""
_LOG.info(f'Going to update policy \'{policy}\' from \'{policy_path}\'')
policy_item = self.policy_service.describe_policy(policy_name=policy)
if not policy_item:
_LOG.error('Policy does not exist')
raise ModularApiBadRequestException(
f'Policy with name \'{policy}\' does not exist. Please check '
f'policy name spelling or add new policy with name \'{policy}\''
f' by command:{line_sep} modular policy add '
f'--policy {policy} --policy_path {policy_path}')
if policy_item.state != ACTIVATED_STATE:
_LOG.error('Policy blocked or deleted')
raise ModularApiBadRequestException(
f'Policy with name \'{policy}\' is blocked or deleted. Can not '
f'update policy content. To get more detailed info please '
f'execute command:{line_sep} modular policy describe --policy '
f'{policy}')
if self.policy_service.calculate_policy_hash(policy_item) != \
policy_item.hash:
click.confirm(
'Provided policy compromised. Command execution leads to policy '
'entity hash sum recalculation. Are you sure?', abort=True)
policy_content = open_json_file(file_path=policy_path)
policy_validation(policy_content=policy_content)
policy_item.content = policy_content
policy_item.last_modification_date = utc_time_now().isoformat()
policy_hash_sum = self.policy_service.calculate_policy_hash(
policy_item=policy_item)
policy_item.hash = policy_hash_sum
self.policy_service.save_policy(policy_item=policy_item)
_LOG.info('Policy successfully updated')
return CommandResponse(
message=f'Policy with name \'{policy}\' successfully updated')
def describe_policy_handler(
self,
policy: str,
expand_view: bool = False,
) -> CommandResponse:
"""
Describes policy content from ModularPolicy table for specified name
or list all existed policies
:param expand_view: output will have more policy content
:param policy: Optional. Policy name which will be described
:return: CommandResponse
"""
_LOG.info(f'Going to describe policy \'{policy}\'')
invalid = 0
if policy:
item = self.policy_service.describe_policy(policy_name=policy)
policy_items = (item, ) if item else ()
else:
policy_items = tuple(self.policy_service.scan_policies())
if not policy_items:
raise ModularApiBadRequestException(
'Policy(ies) not exists. To add policy please execute '
'\'modular policy add\' command')
pretty_policies = []
for policy in policy_items:
is_compromised = self.policy_service.calculate_policy_hash(
policy_item=policy) != policy.hash
if is_compromised:
invalid += 1
pretty_user_item = {
'Policy Name': policy.policy_name,
'Policy Content': policy.content,
'State': policy.state,
'Last Modification Date': convert_datetime_to_human_readable(
datetime_object=policy.last_modification_date
),
'Creation Date': convert_datetime_to_human_readable(
datetime_object=policy.creation_date
),
'Consistency status': 'Compromised' if is_compromised else 'OK'
}
if not expand_view:
del pretty_user_item['Policy Content']
pretty_policies.append(pretty_user_item)
valid_title = 'Policy(ies) description'
compromised_title = f'Policy(ies) description{os.linesep}WARNING! ' \
f'{invalid} compromised policy(ies) have been detected.'
return CommandResponse(
table_title=compromised_title if invalid else valid_title,
items=pretty_policies,
)
def delete_policy_handler(self, policy: str) -> CommandResponse:
"""
Delete policy from ModularPolicy table
:param policy: Policy name to delete
:return: CommandResponse
"""
_LOG.info(f'Going to delete policy \'{policy}\'')
policy_item = self.policy_service.describe_policy(policy_name=policy)
if not policy_item:
_LOG.error('Policy does not exist')
raise ModularApiBadRequestException(
f'Policy with name \'{policy}\' does not exist. Nothing to '
f'delete')
if policy_item.state != ACTIVATED_STATE:
_LOG.error('Policy blocked or deleted')
raise ModularApiBadRequestException(
f'Policy with name \'{policy}\' already blocked or deleted. '
f'To get more detailed information please execute command:'
f'{line_sep}modular policy describe --policy {policy}')
if self.policy_service.calculate_policy_hash(policy_item) != \
policy_item.hash:
click.confirm(
f'Policy with name {policy} is compromised. Command execution '
f'leads to policy entity hash sum recalculation. Are you sure?',
abort=True)
self._check_policy_in_groups(policy_name=policy)
policy_item.state = REMOVED_STATE
policy_item.last_modification_date = utc_time_now().isoformat()
policy_hash_sum = self.policy_service.calculate_policy_hash(
policy_item=policy_item)
policy_item.hash = policy_hash_sum
self.policy_service.save_policy(policy_item=policy_item)
_LOG.info('Policy successfully deleted')
return CommandResponse(
message=f'Policy with name \'{policy}\' successfully deleted')
def _check_policy_in_groups(self, policy_name: str) -> None:
all_groups_with_policy = list(self.group_service.scan_groups(
filter_condition=Group.policies.contains(policy_name)
))
groups_to_pay_attention = list()
del_commands_list = str()
for group in all_groups_with_policy:
if group.state != ACTIVATED_STATE:
continue
groups_to_pay_attention.append(group.group_name)
del_commands_list += f'modular group delete_policy ' \
f'--policy {policy_name} --group ' \
f'{group.group_name}{line_sep}'
if groups_to_pay_attention:
raise ModularApiBadRequestException(
f'Policy with name \'{policy_name}\' can not be deleted due to '
f'it existence in the following activated group(s): '
f'{groups_to_pay_attention}.{line_sep}'
f'You should remove policy \'{policy_name}\' from each group in '
f'{groups_to_pay_attention} list first.{line_sep}'
f'Policy can be deleted from group via command(s):{line_sep}'
f'{del_commands_list}')