in src/main/java/com/epam/aidial/kubernetes/KubernetesClient.java [224:260]
public Mono<String> createKnativeService(String namespace, V1Service service, int serviceSetupTimeoutSec) {
// Currently there is no asynchronous Watch api
return Mono.fromCallable(() -> {
String name = service.getMetadata().getName();
ServiceVersion version = ServiceVersion.parse(service.getApiVersion());
CustomObjectsApi customObjectsApi = new CustomObjectsApi(apiClient);
Call call = customObjectsApi.listNamespacedCustomObject(version.group(), version.version(), namespace, SERVICES)
.watch(true)
.fieldSelector(NAME_SELECTOR_PREFIX + name)
.timeoutSeconds(serviceSetupTimeoutSec)
.buildCall(null);
try (Watch<V1Service> watch = Watch.createWatch(
customObjectsApi.getApiClient(), call, SERVICE_TYPE_TOKEN.getType())) {
log.info("Creating service {}", name);
customObjectsApi.createNamespacedCustomObject(version.group(), version.version(), namespace, SERVICES, service)
.execute();
for (Watch.Response<V1Service> item : watch) {
V1Service serviceState = item.object;
if (serviceState != null) {
Validate.isTrue(name.equals(serviceState.getMetadata().getName()));
String url = KubernetesUtils.extractServiceUrl(serviceState);
if (url != null) {
log.info("Service {} has been set up", name);
return url;
}
} else {
logStatus(item.status);
}
}
}
throw new IllegalStateException("Subscription to service %s events expired".formatted(name));
})
.subscribeOn(Schedulers.boundedElastic());
}