src/lambdas/r8s_api_handler/processors/policies_processor.py (204 lines of code) (raw):

from commons import RESPONSE_BAD_REQUEST_CODE, raise_error_response, \ build_response, RESPONSE_RESOURCE_NOT_FOUND_CODE, RESPONSE_OK_CODE, \ validate_params from commons.abstract_lambda import PARAM_HTTP_METHOD from commons.constants import GET_METHOD, POST_METHOD, DELETE_METHOD, \ PATCH_METHOD, NAME_ATTR, PERMISSIONS_ATTR, \ PERMISSIONS_ADMIN_ATTR, PERMISSIONS_TO_ATTACH, PERMISSIONS_TO_DETACH from commons.log_helper import get_logger from lambdas.r8s_api_handler.processors.abstract_processor import \ AbstractCommandProcessor from services.rbac.access_control_service import AccessControlService from services.rbac.iam_service import IamService from services.user_service import CognitoUserService _LOG = get_logger('r8s-policy-processor') class PolicyProcessor(AbstractCommandProcessor): def __init__(self, user_service: CognitoUserService, access_control_service: AccessControlService, iam_service: IamService): self.user_service = user_service self.access_control_service = access_control_service self.iam_service = iam_service self.method_to_handler = { GET_METHOD: self.get, POST_METHOD: self.post, PATCH_METHOD: self.patch, DELETE_METHOD: self.delete, } def process(self, event) -> dict: method = event.get(PARAM_HTTP_METHOD) command_handler = self.method_to_handler.get(method) if not command_handler: message = f'Unable to handle command {method} in ' \ f'policy processor' _LOG.error(f'status code: {RESPONSE_BAD_REQUEST_CODE}, ' f'process error: {message}') raise_error_response(message, RESPONSE_BAD_REQUEST_CODE) return command_handler(event=event) def get(self, event): _LOG.debug(f'Get policy event: {event}') policy_name = event.get(NAME_ATTR) if policy_name: _LOG.debug(f'Extracting policy with name \'{policy_name}\'') policies = [self.iam_service.policy_get(policy_name=policy_name)] else: _LOG.debug(f'Extracting all available policies') policies = self.iam_service.list_policies() if not policies or policies \ and all([policy is None for policy in policies]): _LOG.debug('No policies found matching given query.') return build_response( code=RESPONSE_RESOURCE_NOT_FOUND_CODE, content='No policies found matching given query.' ) policies_dto = [policy.get_dto() for policy in policies] _LOG.debug(f'Policies to return: {policies_dto}') return build_response( code=RESPONSE_OK_CODE, content=policies_dto ) def post(self, event): _LOG.debug(f'Create policy event: {event}') validate_params(event, (NAME_ATTR,)) if not event.get(PERMISSIONS_ATTR) \ and not event.get(PERMISSIONS_ADMIN_ATTR): required = ", ".join((PERMISSIONS_ATTR, PERMISSIONS_ADMIN_ATTR)) _LOG.debug(f'One of the attributes \'{required}\' must be ' f'specified') return build_response( code=RESPONSE_BAD_REQUEST_CODE, content=f'One of the attributes \'{required}\' must be ' f'specified' ) policy_name = event.get(NAME_ATTR) if self.access_control_service.policy_exists(name=policy_name): _LOG.debug(f'Policy with name \'{policy_name}\' already exists.') return build_response( code=RESPONSE_BAD_REQUEST_CODE, content=f'Policy with name \'{policy_name}\' already exists.' ) permissions = event.get(PERMISSIONS_ATTR) if permissions: non_existing = self.access_control_service. \ get_non_existing_permissions(permissions=permissions) if non_existing: _LOG.debug(f'Some of the specified permissions don\'t exist: ' f'{", ".join(non_existing)}') return build_response( code=RESPONSE_BAD_REQUEST_CODE, content=f'Some of the specified permissions don\'t exist: ' f'{", ".join(non_existing)}' ) elif event.get(PERMISSIONS_ADMIN_ATTR, None): permissions = self.access_control_service.get_admin_permissions() policy_data = { NAME_ATTR: policy_name, PERMISSIONS_ATTR: permissions } _LOG.debug(f'Going to create policy with data: {policy_data}') policy = self.access_control_service.create_policy( policy_data=policy_data) _LOG.debug(f'Saving policy') self.access_control_service.save(policy) policy_dto = policy.get_dto() _LOG.debug(f'Response: {policy_dto}') return build_response( code=RESPONSE_OK_CODE, content=policy_dto ) def patch(self, event): _LOG.debug(f'Update policy event: {event}') validate_params(event, (NAME_ATTR,)) policy_name = event.get(NAME_ATTR) permissions = event.get(PERMISSIONS_ATTR) to_attach = event.get(PERMISSIONS_TO_ATTACH) to_detach = event.get(PERMISSIONS_TO_DETACH) if not any(i for i in (permissions, to_attach, to_detach)): required = ', '.join((PERMISSIONS_ATTR, PERMISSIONS_TO_ATTACH, PERMISSIONS_TO_DETACH)) _LOG.debug(f'One of the following arguments \'{required}\' must ' f'be provided.') return build_response( code=RESPONSE_BAD_REQUEST_CODE, content=f'One of the following arguments \'{required}\' must ' f'be provided.' ) if not self.access_control_service.policy_exists(name=policy_name): _LOG.debug(f'Policy with name \'{policy_name}\' does not exist.') return build_response( code=RESPONSE_RESOURCE_NOT_FOUND_CODE, content=f'Policy with name \'{policy_name}\' does not exist.' ) policy = self.access_control_service.get_policy(name=policy_name) if permissions: _LOG.debug(f'Going to reset permissions for policy with name ' f'\'{policy_name}\'. Permissions: {permissions}') non_existing = self.access_control_service. \ get_non_existing_permissions(permissions=permissions) if non_existing: _LOG.debug(f'Some of the specified permissions don\'t exist: ' f'{", ".join(non_existing)}') return build_response( code=RESPONSE_BAD_REQUEST_CODE, content=f'Some of the specified permissions don\'t exist: ' f'{", ".join(non_existing)}' ) policy.permissions = permissions else: if to_attach: _LOG.debug(f'going to attach permissions to policy: ' f'\'{to_attach}\'') non_existing = self.access_control_service.\ get_non_existing_permissions(permissions=to_attach) if non_existing: _LOG.debug( f'Some of the specified permissions don\'t exist: ' f'{", ".join(non_existing)}') return build_response( code=RESPONSE_BAD_REQUEST_CODE, content=f'Some of the specified permissions don\'t ' f'exist: {", ".join(non_existing)}' ) policy_permissions = policy.get_json().get(PERMISSIONS_ATTR) policy_permissions.extend(to_attach) policy_permissions = list(set(policy_permissions)) policy.permissions = policy_permissions if to_detach: _LOG.debug(f'going to detach permissions from policy: ' f'\'{to_detach}\'') policy_permissions = policy.get_json().get(PERMISSIONS_ATTR) for permission in to_detach: if permission in policy_permissions: _LOG.debug(f'Removing permission: {permission}') policy_permissions.remove(permission) else: _LOG.debug(f'Permission \'{permission}\' does not ' f'exist in policy.') policy.permissions = policy_permissions _LOG.debug(f'Saving policy') self.access_control_service.save(policy) policy_dto = policy.get_dto() _LOG.debug(f'Response: {policy_dto}') return build_response( code=RESPONSE_OK_CODE, content=policy_dto ) def delete(self, event): _LOG.debug(f'Delete policy event: {event}') validate_params(event, (NAME_ATTR,)) policy_name = event.get(NAME_ATTR) if not self.access_control_service.policy_exists(name=policy_name): _LOG.debug(f'Policy with name \'{policy_name}\' does not exist.') return build_response( code=RESPONSE_OK_CODE, content=f'Policy with name \'{policy_name}\' does not exist.' ) _LOG.debug(f'Extracting policy with name \'{policy_name}\'') policy = self.access_control_service.get_policy(name=policy_name) _LOG.debug(f'Deleting policy') self.access_control_service.delete_entity(policy) _LOG.debug(f'Policy with name \'{policy_name}\' has been deleted.') return build_response( code=RESPONSE_OK_CODE, content=f'Policy with name \'{policy_name}\' has been deleted.' )