in src/run.py [0:0]
def upload_to_siem(tenant: Tenant, collection: ShardsCollection,
job: AmbiguousJob, platform: Platform | None = None):
metadata = SP.license_service.get_customer_metadata(tenant.customer_name)
for dojo, configuration in SP.integration_service.get_dojo_adapters(tenant, True):
convertor = ShardCollectionDojoConvertor.from_scan_type(
configuration.scan_type,
metadata
)
configuration = configuration.substitute_fields(job, platform)
client = DojoV2Client(
url=dojo.url,
api_key=SP.defect_dojo_service.get_api_key(dojo)
)
try:
client.import_scan(
scan_type=configuration.scan_type,
scan_date=utc_datetime(),
product_type_name=configuration.product_type,
product_name=configuration.product,
engagement_name=configuration.engagement,
test_title=configuration.test,
data=convertor.convert(collection),
tags=SP.integration_service.job_tags_dojo(job)
)
except Exception:
_LOG.exception('Unexpected error occurred pushing to dojo')
mcs = SP.modular_client.maestro_credentials_service()
for chronicle, configuration in SP.integration_service.get_chronicle_adapters(tenant, True):
_LOG.debug('Going to push data to Chronicle')
creds = mcs.get_by_application(
chronicle.credentials_application_id,
tenant
)
if not creds:
continue
client = ChronicleV2Client(
url=chronicle.endpoint,
credentials=creds.GOOGLE_APPLICATION_CREDENTIALS,
customer_id=chronicle.instance_customer_id
)
match configuration.converter_type:
case ChronicleConverterType.EVENTS:
_LOG.debug('Converting our collection to UDM events')
convertor = ShardCollectionUDMEventsConvertor(metadata, tenant=tenant)
client.create_udm_events(events=convertor.convert(collection))
case _: # ENTITIES
_LOG.debug('Converting our collection to UDM entities')
convertor = ShardCollectionUDMEntitiesConvertor(metadata, tenant=tenant)
success = client.create_udm_entities(
entities=convertor.convert(collection),
log_type='AWS_API_GATEWAY' # todo use a generic log type or smt
)