in src/handlers/resource_report_handler.py [0:0]
def get_specific_job(self, event: ResourceReportJobGetModel, job_id: str):
job = self.ajs.get_job(
job_id=job_id, typ=event.job_type, customer=event.customer
)
if not job:
return build_response(
content=f'Job {job_id} not found', code=HTTPStatus.NOT_FOUND
)
if job.is_platform_job:
return build_response(
code=HTTPStatus.NOT_IMPLEMENTED,
content='Platform job resources report is not available now',
)
tenant = self._tenant_service.get(job.tenant_name)
tenant = modular_helpers.assert_tenant_valid(tenant, event.customer)
if job.type == JobType.MANUAL:
collection = self._report_service.job_collection(tenant, job.job)
else:
collection = self._report_service.ed_job_collection(
tenant, job.job
)
if event.region:
_LOG.debug(
'Region is provided. Fetching only shard with ' 'this region'
)
collection.fetch(region=event.region)
else:
_LOG.debug('Region is not provided. Fetching all shards')
collection.fetch_all()
collection.meta = self._report_service.fetch_meta(tenant)
metadata = self._ls.get_customer_metadata(event.customer_id)
dictionary_url = None
dictionary = {}
matched = MatchedResourcesIterator(
collection=collection,
resource_type=event.resource_type,
region=event.region,
exact_match=event.exact_match,
search_by_all=event.search_by_all,
search_by=event.extras,
dictionary_out=dictionary if event.obfuscated else None,
)
response = ResourceReportBuilder(
matched_findings_iterator=matched,
entity=tenant,
full=event.full,
metadata=metadata,
).build()
if event.obfuscated:
flip_dict(dictionary)
dictionary_url = self._report_service.one_time_url_json(
dictionary, 'dictionary.json'
)
if event.href:
url = self._report_service.one_time_url_json(
response, f'{job_id}-resources.json'
)
content = ReportResponse(
job, url, dictionary_url, ReportFormat.JSON
).dict()
else:
content = self.dto(job, response)
return build_response(content=content)