public Mono createJob()

in src/main/java/com/epam/aidial/kubernetes/KubernetesClient.java [93:128]


    public Mono<Void> createJob(String namespace, V1Job job, int imageBuildTimeoutSec) {
        // Currently there is no asynchronous Watch api
        return Mono.<Void>fromCallable(() -> {
            String name = job.getMetadata().getName();

            BatchV1Api batchApi = new BatchV1Api(apiClient);
            Call call = batchApi.listNamespacedJob(namespace)
                    .watch(true)
                    .fieldSelector(NAME_SELECTOR_PREFIX + name)
                    .timeoutSeconds(imageBuildTimeoutSec)
                    .buildCall(null);

            try (Watch<V1Job> watch = Watch.createWatch(batchApi.getApiClient(), call, JOB_TYPE_TOKEN.getType())) {
                log.info("Creating job {}", name);
                batchApi.createNamespacedJob(namespace, job)
                        .execute();

                log.info("Waiting for job {} to complete", name);
                for (Watch.Response<V1Job> item : watch) {
                    V1Job jobState = item.object;
                    if (jobState != null) {
                        Validate.isTrue(name.equals(jobState.getMetadata().getName()));
                        if (KubernetesUtils.extractJobCompletionStatus(jobState)) {
                            log.info("Job {} has completed successfully", name);
                            return null;
                        }
                    } else {
                        logStatus(item.status);
                    }
                }
            }

            throw new IllegalStateException("Subscription to job %s events expired".formatted(name));
        })
        .subscribeOn(Schedulers.boundedElastic());
    }