in src/lambdas/r8s_api_handler/processors/role_processor.py [0:0]
def patch(self, event):
_LOG.debug(f'Patch role event" {event}')
validate_params(event, (NAME_ATTR,))
role_name = event.get(NAME_ATTR)
if not self.access_control_service.role_exists(name=role_name):
_LOG.error(f'Role with name \'{role_name}\' does not exist.')
return build_response(
code=RESPONSE_RESOURCE_NOT_FOUND_CODE,
content=f'Role with name \'{role_name}\' does not exist.'
)
_LOG.debug(f'Extracting role with name \'{role_name}\'')
role = self.access_control_service.get_role(name=role_name)
expiration = event.get(EXPIRATION_ATTR)
if expiration:
error = self._validate_expiration(expiration)
if error:
return build_response(
code=RESPONSE_BAD_REQUEST_CODE,
content=error
)
_LOG.debug(f'Setting role expiration to \'{expiration}\'')
role.expiration = expiration
to_attach = event.get(POLICIES_TO_ATTACH)
if to_attach:
_LOG.debug(f'Attaching policies \'{to_attach}\'')
non_existing = self.access_control_service. \
get_non_existing_policies(policies=to_attach)
if non_existing:
_LOG.error(f'Some of the policies provided in the request '
f'do not exist: \'{non_existing}\'')
return build_response(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Some of the policies provided in the request '
f'do not exist: \'{", ".join(non_existing)}\''
)
role_policies = role.get_json().get(POLICIES_ATTR)
role_policies.extend(to_attach)
role_policies = list(set(role_policies))
_LOG.debug(f'Role policies: {role_policies}')
role.policies = role_policies
to_detach = event.get(POLICIES_TO_DETACH)
if to_detach:
_LOG.debug(f'Detaching policies \'{to_detach}\'')
role_policies = role.get_json().get(POLICIES_ATTR)
for policy in to_detach:
if policy in role_policies:
role_policies.remove(policy)
else:
_LOG.error(f'Policy \'{to_detach}\' does not exist in '
f'role \'{role_name}\'.')
_LOG.debug(f'Setting role policies: {role_policies}')
role.policies = role_policies
_LOG.debug(f'Saving role')
self.access_control_service.save(role)
_LOG.debug(f'Extracting role dto')
role_dto = role.get_dto()
_LOG.debug(f'Response: {role_dto}')
return build_response(
code=RESPONSE_OK_CODE,
content=role_dto
)