in src/lambdas/r8s_api_handler/processors/policies_processor.py [0:0]
def post(self, event):
_LOG.debug(f'Create policy event: {event}')
validate_params(event, (NAME_ATTR,))
if not event.get(PERMISSIONS_ATTR) \
and not event.get(PERMISSIONS_ADMIN_ATTR):
required = ", ".join((PERMISSIONS_ATTR, PERMISSIONS_ADMIN_ATTR))
_LOG.debug(f'One of the attributes \'{required}\' must be '
f'specified')
return build_response(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'One of the attributes \'{required}\' must be '
f'specified'
)
policy_name = event.get(NAME_ATTR)
if self.access_control_service.policy_exists(name=policy_name):
_LOG.debug(f'Policy with name \'{policy_name}\' already exists.')
return build_response(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Policy with name \'{policy_name}\' already exists.'
)
permissions = event.get(PERMISSIONS_ATTR)
if permissions:
non_existing = self.access_control_service. \
get_non_existing_permissions(permissions=permissions)
if non_existing:
_LOG.debug(f'Some of the specified permissions don\'t exist: '
f'{", ".join(non_existing)}')
return build_response(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Some of the specified permissions don\'t exist: '
f'{", ".join(non_existing)}'
)
elif event.get(PERMISSIONS_ADMIN_ATTR, None):
permissions = self.access_control_service.get_admin_permissions()
policy_data = {
NAME_ATTR: policy_name,
PERMISSIONS_ATTR: permissions
}
_LOG.debug(f'Going to create policy with data: {policy_data}')
policy = self.access_control_service.create_policy(
policy_data=policy_data)
_LOG.debug(f'Saving policy')
self.access_control_service.save(policy)
policy_dto = policy.get_dto()
_LOG.debug(f'Response: {policy_dto}')
return build_response(
code=RESPONSE_OK_CODE,
content=policy_dto
)