in src/handlers/ruleset_handler.py [0:0]
def update_ruleset(self, event: RulesetPatchModel):
customer = event.customer or SYSTEM_CUSTOMER
if not event.version:
_LOG.debug('Ruleset version was not specified. '
'Updating the latest one')
ruleset = self.ruleset_service.get_latest(
customer=customer,
name=event.name
)
if not ruleset:
raise ResponseFactory(HTTPStatus.NOT_FOUND).message(
f'There not any rulesets with name {event.name}'
).exc()
else:
_LOG.debug('Ruleset version was provided. Trying to resolve')
ruleset = self.ruleset_service.get_standard(
customer=customer,
name=event.name,
version=Version(event.version).to_str()
)
if not ruleset:
raise ResponseFactory(HTTPStatus.NOT_FOUND).message(
f'Ruleset {event.name} with version '
f'{event.version} not found'
).exc()
# by here we have the ruleset we want to update
s3_path = ruleset.s3_path.as_dict()
if not s3_path:
return build_response(code=HTTPStatus.BAD_REQUEST,
content='Cannot update empty ruleset')
content = self.s3_client.gz_get_json(
bucket=s3_path.get('bucket_name'),
key=s3_path.get('path')
)
name_body = self._rule_name_to_body(content)
hash_before = self.ruleset_service.hash_from_name_to_body(name_body)
if event.rules_to_detach:
resolved = RuleNamesResolver(name_body.keys()).resolved_names(
event.rules_to_detach
)
for to_detach in resolved:
name_body.pop(to_detach, None)
for rule in event.rules_to_attach:
item = self.rule_service.resolve_rule(
customer=customer,
name_prefix=rule,
cloud=ruleset.cloud
)
if not item:
raise ResponseFactory(HTTPStatus.NOT_FOUND).message(
self.rule_service.not_found_message(rule)
).exc()
name_body[item.name] = item.build_policy()
hash_after = self.ruleset_service.hash_from_name_to_body(name_body)
if not event.force and hash_before == hash_after:
raise ResponseFactory(HTTPStatus.CONFLICT).message(
'Ruleset would remain the same after changes.'
).exc()
new_version = Version(ruleset.version).next_minor()
ruleset.version = new_version.to_str()
ruleset.rules = list(name_body)
ruleset.created_at = utc_iso()
self.upload_ruleset(ruleset,
{'policies': list(name_body.values())})
self.ruleset_service.save(ruleset)
return build_response(
code=HTTPStatus.OK,
content=self.ruleset_service.dto(
ruleset,
{S3_PATH_ATTR, RULES_ATTR, EVENT_DRIVEN_ATTR}
)
)