src/services/report_convertors.py (314 lines of code) (raw):
import csv
import io
from abc import ABC, abstractmethod
from base64 import b64encode
from datetime import datetime, timezone
from functools import partial
from typing import TYPE_CHECKING, Literal, TypedDict
import msgspec
from typing_extensions import NotRequired
from xlsxwriter.workbook import Workbook
from helpers import filter_dict, hashable
from helpers.constants import REPORT_FIELDS
from services.metadata import Metadata
from services.xlsx_writer import CellContent, Table, XlsxRowsWriter
if TYPE_CHECKING:
from services.sharding import ShardsCollection
class ShardCollectionConvertor(ABC):
def __init__(self, metadata: Metadata) -> None:
self.meta = metadata
@abstractmethod
def convert(self, collection: 'ShardsCollection') -> dict | list:
"""
Must convert the given shards collection to some other report
:param collection:
:return:
"""
class ShardCollectionDojoConvertor(ShardCollectionConvertor):
"""
Subclass only for defect dojo convertors
"""
@abstractmethod
def convert(self, collection: 'ShardsCollection') -> dict | list: ...
@classmethod
def from_scan_type(
cls, scan_type: str, metadata: Metadata, **kwargs
) -> 'ShardCollectionDojoConvertor':
"""
Returns a generic dojo convertor by default
:param scan_type:
:param kwargs:
:return:
"""
match scan_type:
case 'Generic Findings Import':
return ShardsCollectionGenericDojoConvertor(metadata, **kwargs)
case 'Cloud Custodian Scan':
return ShardsCollectionCloudCustodianDojoConvertor(
metadata, **kwargs
)
case _:
return ShardsCollectionGenericDojoConvertor(metadata, **kwargs)
# for generic dojo parser
class FindingFile(TypedDict):
title: str
data: str
class Finding(TypedDict):
title: str
date: str # when discovered, iso
severity: str # Info, Low, Medium, High, Critical. Info, if we don't know
description: str
mitigation: str | None
impact: str | None
references: str # standards vs mitre?
tags: list[str]
vuln_id_from_tool: str # rule id
service: str # service
files: NotRequired[list[FindingFile]]
class Findings(TypedDict):
findings: list[Finding]
class ShardsCollectionGenericDojoConvertor(ShardCollectionDojoConvertor):
def __init__(
self,
metadata: Metadata,
attachment: Literal['json', 'xlsx', 'csv'] | None = None,
**kwargs,
):
"""
In case attachment is provided, findings data will be attached as file
in that format. Otherwise, table will be drawn directly inside
description
:param attachment:
:param kwargs:
"""
super().__init__(metadata)
self._attachment = attachment
@staticmethod
def _make_table(resources: list[dict]) -> str:
"""
In case resource have arn, we don't show id and name and namespace
(cause arn can be really long and can break dojo description),
otherwise -> id, name, namespace
:param resources:
:return:
"""
from tabulate import tabulate
if resources[0].get('arn'): # can be sure IndexError won't occur
# all resources within a table are similar
headers = ('arn',)
else: # id name, namespace
headers = ('id', 'name', 'namespace')
return tabulate(
tabular_data=[[res.get(h) for h in headers] for res in resources],
headers=map(str.title, headers), # type: ignore
tablefmt='rounded_grid',
stralign='center',
numalign='center',
showindex='always',
missingval='-',
disable_numparse=True,
)
@staticmethod
def _make_references(standards: dict) -> str:
data = bytearray(b'#### Standards\n')
for name in standards:
for version in standards[name]:
data.extend(f'* {name} **{version}**\n'.encode())
# TODO: check and fix null version
# TODO: add mitre here
return data.decode('utf-8')
@staticmethod
def _make_json_file(resources: list[dict]) -> str:
"""
Dumps resources to json and encodes to base64 as dojo expects
:return:
"""
return b64encode(msgspec.json.encode(resources)).decode()
@staticmethod
def _make_xlsx_file(resources: list[dict]) -> str:
"""
Dumps resources to xlsx file and encodes to base64 as dojo expects
:param resources:
:return:
"""
buffer = io.BytesIO()
with Workbook(buffer) as wb:
bold = wb.add_format({'bold': True})
table = Table()
table.new_row()
headers = ('arn', 'id', 'name', 'namespace')
for h in ('№',) + headers:
table.add_cells(CellContent(h.title(), bold))
for i, r in enumerate(resources, 1):
table.new_row()
table.add_cells(CellContent(i))
for h in headers:
table.add_cells(CellContent(r.get(h)))
wsh = wb.add_worksheet('resources')
XlsxRowsWriter().write(wsh, table)
return b64encode(buffer.getvalue()).decode()
@staticmethod
def _make_csv_file(resources: list[dict]) -> str:
"""
Dumps resources to csv file and encodes to base64 as dojo expects
:param resources:
:return:
"""
buffer = io.StringIO()
writer = csv.writer(buffer)
writer.writerow(('№', 'Arn', 'Id', 'Name', 'Namespace'))
writer.writerows(
(
i,
res.get('arn'),
res.get('id'),
res.get('name'),
res.get('namespace'),
)
for i, res in enumerate(resources, 1)
)
return b64encode(buffer.getvalue().encode()).decode()
def convert(self, collection: 'ShardsCollection') -> Findings:
findings = []
meta = collection.meta
for part in collection.iter_parts():
if not part.resources:
continue
pm = meta.get(part.policy) or {} # part meta
p = part.policy
pm2 = self.meta.rule(
p, comment=pm.get('comment'), resource=pm.get('resource')
)
# tags
tags = [part.location, pm.get('resource')]
if service_section := pm2.service_section:
tags.append(service_section)
match self._attachment:
case 'xlsx':
extra = {
'description': pm2.article,
'files': [
{
'title': f'{p}.xlsx',
'data': self._make_xlsx_file(part.resources),
}
],
}
case 'json':
extra = {
'description': pm2.article,
'files': [
{
'title': f'{p}.json',
'data': self._make_json_file(part.resources),
}
],
}
case 'csv':
extra = {
'description': pm2.article,
'files': [
{
'title': f'{p}.csv',
'data': self._make_csv_file(part.resources),
}
],
}
case _: # None or some unexpected
table = self._make_table(part.resources)
extra = {'description': f'{pm2.article}\n{table}'}
findings.append(
{
'title': pm['description'] if 'description' in pm else p,
'date': datetime.fromtimestamp(
part.timestamp, tz=timezone.utc
).isoformat(),
'severity': pm2.severity.value,
'mitigation': pm2.remediation,
'impact': pm2.impact,
'references': self._make_references(pm2.standard),
'tags': tags,
'vuln_id_from_tool': p,
'service': pm2.service,
**extra,
}
)
return {'findings': findings}
class ShardsCollectionCloudCustodianDojoConvertor(
ShardCollectionDojoConvertor
):
"""
Converts existing shards collection to the format that is accepted by
Cloud Custodian dojo parser
"""
class Model(TypedDict):
"""
Parser expects a list of such items
"""
description: str
resources: list[dict]
remediation: str | None
impact: str | None
standard: dict
severity: str | None
article: str | None
service: str | None
vuln_id_from_tool: str | None
tags: list[str]
def __init__(
self, metadata: Metadata, resource_per_finding: bool = False, **kwargs
):
super().__init__(metadata)
self._rpf = resource_per_finding
@staticmethod
def _convert_standards(standards: dict) -> dict:
res = {}
for name in standards:
for version in standards[name]:
res.setdefault(name, []).append(version)
return res
@staticmethod
def _prepare_resources(resources: list[dict]) -> list[dict]:
"""
Keeps only report fields and sorts by
:param resources:
:return:
"""
skey = 'id'
ftr = partial(filter_dict, keys=REPORT_FIELDS)
return sorted(
map(ftr, resources), key=lambda r: r.get(skey) or chr(123)
)
def convert(self, collection: 'ShardsCollection') -> list[Model]:
result = []
meta = collection.meta
for part in collection.iter_parts():
if not part.resources:
continue
rule = part.policy
pm = self.meta.rule(
rule,
comment=meta.get(rule, {}).get('comment'),
resource=meta.get(rule, {}).get('comment'),
)
base = {
'description': meta.get(rule, {}).get('description'),
'remediation': pm.remediation,
'impact': pm.impact,
'severity': pm.severity.value,
'standard': self._convert_standards(pm.standard),
'article': pm.article,
'service': pm.service,
'vuln_id_from_tool': rule,
'tags': [part.location],
}
if self._rpf:
for res in part.resources:
result.append(
{**base, 'resources': filter_dict(res, REPORT_FIELDS)}
)
else:
base['resources'] = self._prepare_resources(part.resources)
result.append(base)
return result
class ShardsCollectionDigestConvertor(ShardCollectionConvertor):
class DigestsReport(TypedDict):
total_checks: int
successful_checks: int
failed_checks: dict
violating_resources: int
def convert(self, collection: 'ShardsCollection') -> DigestsReport:
total_checks = 0
successful_checks = 0
total_resources = set()
failed_checks = {'total': 0}
failed_by_severity = {}
for part in collection.iter_parts():
total_checks += 1
sev = self.meta.rule(part.policy).severity.value
if part.resources:
failed_checks['total'] += 1
failed_by_severity.setdefault(sev, 0)
failed_by_severity[sev] += 1
else:
successful_checks += 1
keep_report_fields = partial(filter_dict, keys=REPORT_FIELDS)
total_resources.update(
map(hashable, map(keep_report_fields, part.resources))
)
failed_checks['severity'] = failed_by_severity
return {
'total_checks': total_checks,
'successful_checks': successful_checks,
'failed_checks': failed_checks,
'violating_resources': len(total_resources),
}
class ShardsCollectionDetailsConvertor(ShardCollectionConvertor):
def __init__(self) -> None:
pass
def convert(self, collection: 'ShardsCollection') -> dict[str, list[dict]]:
res = {}
for part in collection.iter_parts():
res.setdefault(part.location, []).append(
{
'policy': {
'name': part.policy,
**(collection.meta.get(part.policy) or {}),
},
'resources': part.resources,
}
)
return res
class ShardsCollectionFindingsConvertor(ShardCollectionConvertor):
def __init__(self) -> None:
pass
def convert(self, collection: 'ShardsCollection') -> dict[str, dict]:
"""
Can't be two parts with the same policy and region
:param collection:
:return:
"""
res = {}
meta = collection.meta
for part in collection.iter_parts():
inner = res.setdefault(
part.policy, {'resources': {}, **(meta.get(part.policy) or {})}
)
inner['resources'][part.location] = part.resources
return res