in src/handlers/errors_handler.py [0:0]
def get_by_job(self, event: JobErrorReportGetModel, job_id: str):
job = self._ambiguous_job_service.get_job(
job_id=job_id,
typ=event.job_type,
customer=event.customer
)
if not job:
return build_response(
content='The request job not found',
code=HTTPStatus.NOT_FOUND
)
statistics = self._report_service.job_statistics(job.job)
data = map(
self._report_service.format_failed,
self._report_service.only_failed(
statistic=statistics,
error_type=event.error_type
)
)
content = []
match event.format:
case ReportFormat.JSON:
if event.href:
url = self._report_service.one_time_url_json(
list(data), f'{job.id}-errors.json'
)
content = ReportResponse(job, url).dict()
else:
content = list(data)
case ReportFormat.XLSX:
buffer = io.BytesIO()
with Workbook(buffer) as wb:
ResourceReportXlsxWriter(data).write(
wb=wb,
wsh=wb.add_worksheet('Errors')
)
buffer.seek(0)
url = self._report_service.one_time_url(
buffer, f'{job.id}-errors.xlsx'
)
content = ReportResponse(job, url, fmt=ReportFormat.XLSX).dict()
return build_response(content=content)