in src/handlers/push_handler.py [0:0]
def push_dojo_by_job_id(self, event: ReportPushByJobIdModel, job_id: str):
job = self._ambiguous_job_service.get_job(
job_id=job_id, typ=event.type, customer=event.customer
)
if not job:
return build_response(
content='The request job not found', code=HTTPStatus.NOT_FOUND
)
if not job.is_succeeded:
return build_response(
content='Job has not succeeded yet', code=HTTPStatus.NOT_FOUND
)
tenant = self._modular_client.tenant_service().get(job.tenant_name)
tenant = modular_helpers.assert_tenant_valid(tenant, event.customer)
dojo, configuration = next(
self._integration_service.get_dojo_adapters(tenant), (None, None)
)
if not dojo or not configuration:
raise (
ResponseFactory(HTTPStatus.BAD_REQUEST)
.message(
f'Tenant {tenant.name} does not have linked dojo configuration'
)
.exc()
)
client = DojoV2Client(
url=dojo.url, api_key=self._dds.get_api_key(dojo)
)
platform = None
if job.is_platform_job:
platform = self._platform_service.get_nullable(job.platform_id)
if not platform:
return build_response(
content='Job platform not found', code=HTTPStatus.NOT_FOUND
)
collection = self._rs.platform_job_collection(platform, job.job)
collection.meta = self._rs.fetch_meta(platform)
else:
collection = self._rs.ambiguous_job_collection(tenant, job)
collection.meta = self._rs.fetch_meta(tenant)
collection.fetch_all()
metadata = self._ls.get_customer_metadata(tenant.customer_name)
configuration = configuration.substitute_fields(job, platform)
code, message = self._push_dojo(
client=client,
configuration=configuration,
job=job,
collection=collection,
metadata=metadata,
)
match code:
case HTTPStatus.OK:
return build_response(
self.get_dojo_dto(job, dojo, configuration)
)
case _:
return build_response(
content=message, code=HTTPStatus.SERVICE_UNAVAILABLE
)