in src/handlers/ruleset_handler.py [0:0]
def create_ruleset(self, event: RulesetPostModel):
customer = event.customer or SYSTEM_CUSTOMER
rs: RuleSource | None = None
desired_version: Version
if event.rule_source_id:
rs = self.rule_source_service.get_nullable(event.rule_source_id)
if not rs or rs.customer != customer:
raise ResponseFactory(HTTPStatus.NOT_FOUND).message(
self.rule_source_service.not_found_message(event.rule_source_id)
).exc()
if event.version:
_LOG.debug('User specified the version of ruleset '
'he wants to create. Checking whether we can')
desired_version = Version(event.version) # validated
ruleset = self.ruleset_service.get_standard(
customer=customer,
name=event.name,
version=desired_version.to_str()
)
if ruleset:
raise ResponseFactory(HTTPStatus.CONFLICT).message(
f'Ruleset {event.name} {desired_version} already exists'
).exc()
else:
_LOG.debug('User did not provide the version. ')
release_version = None
if rs and rs.type == RuleSourceType.GITHUB_RELEASE:
try:
release_version = Version(
rs.latest_sync.release_tag or ''
)
except ValueError:
pass
if release_version:
ruleset = self.ruleset_service.get_standard(
customer=customer,
name=event.name,
version=release_version.to_str()
)
if ruleset:
raise ResponseFactory(HTTPStatus.CONFLICT).message(
f'Ruleset {event.name} for rules release '
f'{release_version} already exists'
).exc()
desired_version = release_version
else:
latest = self.ruleset_service.get_latest(
customer=customer,
name=event.name
)
if latest:
if latest.cloud != event.cloud:
raise ResponseFactory(HTTPStatus.BAD_REQUEST).message(
'Cannot create a new version of ruleset '
'for different cloud'
).exc()
_LOG.debug('The previous ruleset found. Creating the '
'next version')
desired_version = Version(latest.version).next_major()
else:
_LOG.debug('The previous ruleset not found. Creating the '
'first version')
desired_version = Version.first_version()
_LOG.info(f'Resolved version for the next ruleset: {desired_version}')
# The logic above feels a little congested. It just resolves the
# version for the next ruleset and checks whether this version is
# allowed
_LOG.debug('Collecting the list of rules based on incoming params')
if event.rules:
_LOG.info('Concrete rules were provided. '
'Assembling the ruleset using them')
rules = []
for rule_name in event.rules:
rule = self.rule_service.resolve_rule(
customer=customer,
name_prefix=rule_name,
cloud=event.cloud
)
if not rule:
raise ResponseFactory(HTTPStatus.NOT_FOUND).message(
self.rule_service.not_found_message(rule_name)
).exc()
rules.append(rule)
rules = self.rule_service.filter_by(
rules=rules,
git_project=event.git_project_id,
rule_source_id=event.rule_source_id,
ref=event.git_ref
)
elif event.rule_source_id:
_LOG.debug('Querying rules by rule source')
rs = cast(RuleSource, rs)
rules = self.rule_service.get_by_rule_source(
rule_source=rs,
cloud=event.cloud
)
elif event.git_project_id:
_LOG.debug('Querying rules by location index')
rules = self.rule_service.get_by(
customer=customer,
project=event.git_project_id,
ref=event.git_ref,
cloud=event.cloud
)
else:
_LOG.debug('Querying all the rules for cloud')
rules = self.rule_service.get_by_id_index(customer, event.cloud)
_LOG.debug('Removing duplicates')
rules = list(self.rule_service.without_duplicates(rules=rules))
if event.excluded_rules:
_LOG.debug('Removing excluded rules')
resolver = RuleNamesResolver(
resolve_from=map(operator.attrgetter('name'), rules),
)
resolved = set(resolver.resolved_names(event.excluded_rules))
rules = [rule for rule in rules if rule.name not in resolved]
_LOG.debug('Filtering rules by mappings')
rules = list(self._filtered_rules(
rules=rules,
platforms=event.platforms,
categories=event.categories,
service_sections=event.service_sections,
sources=event.sources,
))
if not rules:
_LOG.warning('No rules found by filters')
raise ResponseFactory(HTTPStatus.BAD_REQUEST).message(
'No rules left after filtering'
).exc()
ruleset = self.ruleset_service.create(
customer=customer,
name=event.name,
version=desired_version.to_str(),
cloud=event.cloud,
rules=[rule.name for rule in rules],
event_driven=False,
licensed=False,
)
# TODO add changelog or metadata from release
self.upload_ruleset(ruleset, self.build_policy(rules))
self.ruleset_service.save(ruleset)
return build_response(self.ruleset_service.dto(
ruleset, params_to_exclude={RULES_ATTR, S3_PATH_ATTR}
), code=HTTPStatus.CREATED)