in src/main/java/com/epam/digital/data/platform/starter/kafka/service/StartupKafkaTopicsCreator.java [71:89]
private Set<String> getMissingTopicNames(AdminClient kafkaAdminClient) {
Set<String> existingTopics;
try {
existingTopics =
kafkaAdminClient
.listTopics()
.names()
.get(kafkaProperties.getTopicProperties().getCreation().getTimeoutInSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
throw new CreateKafkaTopicException(
String.format(
"Failed to retrieve existing kafka topics in %d sec",
kafkaProperties.getTopicProperties().getCreation().getTimeoutInSeconds()),
e);
}
Set<String> requiredTopics = getRequiredTopics();
requiredTopics.removeAll(existingTopics);
return requiredTopics;
}