in src/handlers/ruleset_handler.py [0:0]
def post_event_driven_ruleset(self, event: EventDrivenRulesetPostModel):
_LOG.debug('Create event-driven rulesets')
customer = event.customer or SYSTEM_CUSTOMER
rs: RuleSource | None = None
desired_version: Version
if event.rule_source_id:
rs = self.rule_source_service.get_nullable(event.rule_source_id)
if not rs or rs.customer != customer:
raise ResponseFactory(HTTPStatus.NOT_FOUND).message(
self.rule_source_service.not_found_message(event.rule_source_id)
).exc()
if event.version:
_LOG.debug('User specified the version of ruleset '
'he wants to create. Checking whether we can')
desired_version = Version(event.version) # validated
ruleset = self.ruleset_service.get_event_driven(
cloud=event.cloud,
version=desired_version.to_str()
)
if ruleset:
raise ResponseFactory(HTTPStatus.CONFLICT).message(
f'Event driven ruleset {desired_version} already exists'
).exc()
else:
_LOG.debug('User did not provide the version. ')
release_version = None
if rs and rs.type == RuleSourceType.GITHUB_RELEASE:
try:
release_version = Version(
rs.latest_sync.release_tag or ''
)
except ValueError:
pass
if release_version:
ruleset = self.ruleset_service.get_event_driven(
cloud=event.cloud,
version=release_version.to_str()
)
if ruleset:
raise ResponseFactory(HTTPStatus.CONFLICT).message(
f'Event driven ruleset for rules release '
f'{release_version} already exists'
).exc()
desired_version = release_version
else:
latest = self.ruleset_service.get_latest_event_driven(
cloud=event.cloud
)
if latest:
_LOG.debug('The previous ruleset found. Creating the '
'next version')
desired_version = Version(latest.version).next_major()
else:
_LOG.debug('The previous ruleset not found. Creating the '
'first version')
desired_version = Version.first_version()
if event.rule_source_id:
_LOG.debug('Querying rules by rule source')
rs = cast(RuleSource, rs)
rules = self.rule_service.get_by_rule_source(
rule_source=rs,
cloud=event.cloud
)
else:
_LOG.debug('Querying all the rules for cloud')
rules = self.rule_service.get_by_id_index(customer, event.cloud)
rules = list(self.rule_service.without_duplicates(rules=rules))
if not rules:
_LOG.warning('No rules by given parameters were found')
return build_response(
code=HTTPStatus.NOT_FOUND,
content='No rules found'
)
ruleset = self.ruleset_service.create_event_driven(
version=desired_version.to_str(),
cloud=event.cloud,
rules=[rule.name for rule in rules],
)
self.upload_ruleset(ruleset, self.build_policy(rules))
self.ruleset_service.save(ruleset)
return build_response(self.ruleset_service.dto(
ruleset, params_to_exclude={RULES_ATTR}
), code=HTTPStatus.CREATED)