in src/main/java/com/epam/aidial/kubernetes/KubernetesClient.java [93:128]
public Mono<Void> createJob(String namespace, V1Job job, int imageBuildTimeoutSec) {
// Currently there is no asynchronous Watch api
return Mono.<Void>fromCallable(() -> {
String name = job.getMetadata().getName();
BatchV1Api batchApi = new BatchV1Api(apiClient);
Call call = batchApi.listNamespacedJob(namespace)
.watch(true)
.fieldSelector(NAME_SELECTOR_PREFIX + name)
.timeoutSeconds(imageBuildTimeoutSec)
.buildCall(null);
try (Watch<V1Job> watch = Watch.createWatch(batchApi.getApiClient(), call, JOB_TYPE_TOKEN.getType())) {
log.info("Creating job {}", name);
batchApi.createNamespacedJob(namespace, job)
.execute();
log.info("Waiting for job {} to complete", name);
for (Watch.Response<V1Job> item : watch) {
V1Job jobState = item.object;
if (jobState != null) {
Validate.isTrue(name.equals(jobState.getMetadata().getName()));
if (KubernetesUtils.extractJobCompletionStatus(jobState)) {
log.info("Job {} has completed successfully", name);
return null;
}
} else {
logStatus(item.status);
}
}
}
throw new IllegalStateException("Subscription to job %s events expired".formatted(name));
})
.subscribeOn(Schedulers.boundedElastic());
}