modular_api/services/policy_service.py (101 lines of code) (raw):

import json from typing import Iterable from pynamodb.pagination import ResultIterator from modular_api.helpers.constants import ACTIVATED_STATE from modular_api.helpers.date_utils import utc_time_now from modular_api.helpers.exceptions import ModularApiBadRequestException from modular_api.helpers.log_helper import get_logger from modular_api.helpers.password_util import secure_string from modular_api.models.policy_model import Policy _LOG = get_logger(__name__) def validate_policy_item(item: dict) -> str | None: """ Returns str in case there is an error :param item: :return: """ if not isinstance(item.get('Effect'), str): return ('field \'Effect\' of type string is ' 'required for each policy item') if not isinstance(item.get('Module'), str): return ('field \'Module\' of type string is ' 'required for each policy item') if not isinstance(item.get('Resources'), list) or not all( [isinstance(v, str) for v in item.get('Resources')]): return ('field \'Resources\' of type list is ' 'required for each policy item') effect = item['Effect'] effect_allowed = ('Allow', 'Deny') if effect not in effect_allowed: return (f'incorrect \'{effect}\' value provided for \'Effect\' key. ' f'Allowed value: {", ".join(effect_allowed)}') resources = item['Resources'] if not resources: # empty list return ('resources property in policy can not be empty. To mark all ' 'resources use "*" symbol') for value in resources: if value.startswith('/'): return ( f'resource name started with \'/\' not allowed. ' f'Incorrect value: {value}' ) if value.startswith(':'): return ( f'resource name started with \':\' not allowed. ' f'Incorrect value: {value}' ) if value.startswith('*') and value != '*': return ( f'resource name started with \'*\' not allowed. ' f'Incorrect value: {value}. To Allow/Deny all in module`s ' f'content use \'*\' or \'group:*\' or \'group\\subgroup:*\'' ) def policy_validation(policy_content: list[dict]) -> None: """ Raises ModularApiBadRequestException in case value is invalid :param policy_content: :return: """ if not isinstance(policy_content, list) or not all( [isinstance(v, dict) for v in policy_content]): raise ModularApiBadRequestException( 'Policy content should be a list of objects' ) for i, item in enumerate(policy_content, start=1): error = validate_policy_item(item) if error: raise ModularApiBadRequestException( f'Invalid policy item number by index {i}: {error}' ) class PolicyService: @staticmethod def create_policy_entity(policy_name: str, policy_content: dict, state: str = ACTIVATED_STATE): _LOG.info(f'Creating policy \'{policy_name}\'') return Policy( policy_name=policy_name, policy_content=json.dumps(policy_content, sort_keys=True, separators=(',', ':')), state=state, creation_date=utc_time_now().isoformat() ) @staticmethod def calculate_policy_hash(policy_item: Policy): _LOG.info(f'Calculating \'{policy_item.policy_name}\' policy hash ') prepared_user_to_be_hashed = json.dumps( policy_item.response_object_without_hash(), sort_keys=True ) user_hash_sum = secure_string(prepared_user_to_be_hashed) return user_hash_sum @staticmethod def get_policies_by_name(policy_names: Iterable[str]) -> list[Policy]: _LOG.info(f'Batch get policies by provided names: ' f'{policy_names}') return list(Policy.batch_get(items=policy_names)) @staticmethod def save_policy(policy_item: Policy) -> None: _LOG.info(f'Saving policy \'{policy_item.policy_name}\'') policy_item.save() @staticmethod def scan_policies() -> ResultIterator[Policy]: _LOG.info('Scanning policies') return Policy.scan() @staticmethod def describe_policy(policy_name: str) -> Policy | None: _LOG.info(f'Describing policy \'{policy_name}\'') return Policy.get_nullable(hash_key=policy_name) @staticmethod def delete_policy(policy_item: Policy) -> None: _LOG.info(f'Deleting policy \'{policy_item.policy_name}\'') policy_item.delete()