in src/handlers/compliance_handler.py [0:0]
def get_by_job(self, event: JobComplianceReportGetModel, job_id: str):
raise ResponseFactory(HTTPStatus.NOT_IMPLEMENTED).default().exc()
job = self._ambiguous_job_service.get_job(
job_id=job_id,
typ=event.job_type,
customer=event.customer
)
if not job:
return build_response(
content='The request job not found',
code=HTTPStatus.NOT_FOUND
)
tenant = self._tenant_service.get(job.tenant_name)
if not tenant:
return build_response(
code=HTTPStatus.NOT_FOUND,
content='Job tenant not found'
)
# TODO api implement for platform
if not job.is_ed_job:
collection = self._report_service.job_collection(tenant, job.job)
else:
collection = self._report_service.ed_job_collection(tenant,
job.job)
collection.fetch_all()
coverages = self._coverage_service.coverage_from_collection(
collection, modular_helpers.tenant_cloud(tenant)
)
response = ReportResponse(job, coverages, fmt=event.format)
match event.format:
case ReportFormat.JSON:
if event.href:
url = self._report_service.one_time_url_json(
coverages, f'{tenant.name}-compliance.json'
)
response.content = url
case ReportFormat.XLSX:
buffer = io.BytesIO()
with Workbook(buffer) as wb:
ComplianceReportXlsxWriter(coverages).write(
wb=wb,
wsh=wb.add_worksheet('Compliance')
)
buffer.seek(0)
url = self._report_service.one_time_url(
buffer, f'{job.id}-compliance.xlsx'
)
response.content = url
return build_response(content=response.dict())