in src/lambdas/r8s_api_handler/processors/policies_processor.py [0:0]
def patch(self, event):
_LOG.debug(f'Update policy event: {event}')
validate_params(event, (NAME_ATTR,))
policy_name = event.get(NAME_ATTR)
permissions = event.get(PERMISSIONS_ATTR)
to_attach = event.get(PERMISSIONS_TO_ATTACH)
to_detach = event.get(PERMISSIONS_TO_DETACH)
if not any(i for i in (permissions, to_attach, to_detach)):
required = ', '.join((PERMISSIONS_ATTR, PERMISSIONS_TO_ATTACH,
PERMISSIONS_TO_DETACH))
_LOG.debug(f'One of the following arguments \'{required}\' must '
f'be provided.')
return build_response(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'One of the following arguments \'{required}\' must '
f'be provided.'
)
if not self.access_control_service.policy_exists(name=policy_name):
_LOG.debug(f'Policy with name \'{policy_name}\' does not exist.')
return build_response(
code=RESPONSE_RESOURCE_NOT_FOUND_CODE,
content=f'Policy with name \'{policy_name}\' does not exist.'
)
policy = self.access_control_service.get_policy(name=policy_name)
if permissions:
_LOG.debug(f'Going to reset permissions for policy with name '
f'\'{policy_name}\'. Permissions: {permissions}')
non_existing = self.access_control_service. \
get_non_existing_permissions(permissions=permissions)
if non_existing:
_LOG.debug(f'Some of the specified permissions don\'t exist: '
f'{", ".join(non_existing)}')
return build_response(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Some of the specified permissions don\'t exist: '
f'{", ".join(non_existing)}'
)
policy.permissions = permissions
else:
if to_attach:
_LOG.debug(f'going to attach permissions to policy: '
f'\'{to_attach}\'')
non_existing = self.access_control_service.\
get_non_existing_permissions(permissions=to_attach)
if non_existing:
_LOG.debug(
f'Some of the specified permissions don\'t exist: '
f'{", ".join(non_existing)}')
return build_response(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Some of the specified permissions don\'t '
f'exist: {", ".join(non_existing)}'
)
policy_permissions = policy.get_json().get(PERMISSIONS_ATTR)
policy_permissions.extend(to_attach)
policy_permissions = list(set(policy_permissions))
policy.permissions = policy_permissions
if to_detach:
_LOG.debug(f'going to detach permissions from policy: '
f'\'{to_detach}\'')
policy_permissions = policy.get_json().get(PERMISSIONS_ATTR)
for permission in to_detach:
if permission in policy_permissions:
_LOG.debug(f'Removing permission: {permission}')
policy_permissions.remove(permission)
else:
_LOG.debug(f'Permission \'{permission}\' does not '
f'exist in policy.')
policy.permissions = policy_permissions
_LOG.debug(f'Saving policy')
self.access_control_service.save(policy)
policy_dto = policy.get_dto()
_LOG.debug(f'Response: {policy_dto}')
return build_response(
code=RESPONSE_OK_CODE,
content=policy_dto
)