in src/services/udm_generator.py [0:0]
def convert(self, collection: 'ShardsCollection') -> list[dict]:
"""
:param collection:
:return:
"""
meta = collection.meta
datas = {}
policies_results = {}
for part in collection.iter_parts():
for res in part.resources:
unique = hashable((
filter_dict(res, REPORT_FIELDS),
part.location,
meta.get(part.policy, {}).get('resource')
))
inner = datas.setdefault(unique, [set(), res, part.timestamp])
inner[0].add(part.policy)
inner[1].update(res)
inner[2] = max(inner[2], part.timestamp)
if part.policy not in policies_results:
policies_results[part.policy] = UDMVulnerabilityBuilder(
policy=part.policy,
description=meta.get(part.policy, {}).get('description'),
metadata=self.meta.rule(part.policy),
).build()
events = []
for unique, inner in datas.items():
policies, full_res, ts = inner
res, region, rt = unique
resource_id = res.get('arn') or res.get('id') or res.get('name')
resource_name = res.get('name') or res.get('id') or res.get('arn')
resource_type = from_cc_resource_type(rt)
if resource_type is UDMResourceType.UNSPECIFIED: # todo currently api does not accept this one(
resource_type = UDMResourceType.CLOUD_PROJECT
event = UDMEvent(
metadata=UDMEventMetadata(
collected_timestamp=datetime.fromtimestamp(ts, tz=timezone.utc),
description='Syndicate Rule Engine scanned target product',
event_type=UDMEventType.SCAN_VULN_HOST,
product_name=self._tenant.name,
vendor_name=self._tenant.customer_name
),
principal=UDMNoun(
application='Syndicate Rule Engine', # todo maybe add other data
hostname=CAASEnv.API_GATEWAY_HOST.get('rule-engine'), # todo maybe get from ec2 metadata
),
target=UDMNoun(
location=UDMLocation(region),
resource=UDMResource(
product_object_id=resource_id,
name=resource_name,
resource_subtype=rt,
resource_type=resource_type,
attribute=UDMAttribute(
cloud=UDMCloud(
environment=UDMCloudEnvironment.from_local_cloud(self._tenant.cloud),
),
labels=[]
)
)
),
extensions=UDMExtensions(
vulns=UDMVulnerabilities(
vulnerabilities=[policies_results.get(p) for p in policies]
)
)
)
if service := self.meta.rule(next(iter(policies))).service:
event.target.application = service
if date := full_res.get('date'):
dt = self._parse_date(date)
if dt:
event.target.resource.attribute.creation_time = dt
if tags := full_res.get('Tags'):
event.target.resource.attribute.labels.extend(
UDMLabel(key=t['Key'], value=t['Value']) for t in tags
)
events.append(msgspec.to_builtins(event))
return events