in src/main/java/com/epam/eco/commons/kafka/AdminClientUtils.java [1176:1233]
private static <D> Map<String, D> describeInParallel(
Map<String, Object> clientConfig,
String label,
Collection<String> resourceNames,
DescribeFunction<D> describeFunction) {
label = StringUtils.isNotBlank(label) ? label : "resource";
List<List<String>> resourceNamePartitions = ListUtils.partition(
new ArrayList<>(resourceNames),
PARALLELISM_THRESHOLD);
int numClients = Math.min(
Runtime.getRuntime().availableProcessors(),
resourceNamePartitions.size());
LOGGER.info("Initiating parallel description of {} {}s using {} clients", resourceNames.size(), label, numClients);
List<AdminClient> clients = new ArrayList<>(numClients);
try {
for (int i = 0; i < numClients; i++) {
clients.add(initClient(clientConfig));
}
Map<String, KafkaFuture<D>> futures = new HashMap<>();
for (int i = 0; i < resourceNamePartitions.size(); i++) {
List<String> resourceNamePartition = resourceNamePartitions.get(i);
AdminClient client = clients.get(i % numClients);
futures.putAll(describeFunction.apply(client, resourceNamePartition));
}
int describedCount = 0;
Map<String, D> descriptions = new HashMap<>((int) (resourceNames.size() / 0.75));
for (Entry<String, KafkaFuture<D>> entry : futures.entrySet()) {
descriptions.put(entry.getKey(), entry.getValue().get());
describedCount++;
if (describedCount % PARALLELISM_THRESHOLD == 0) {
LOGGER.info("Described {} {}s out of {}", describedCount, label, resourceNames.size());
}
}
if (describedCount % PARALLELISM_THRESHOLD != 0) {
LOGGER.info("Described {} {}s out of {}", describedCount, label, resourceNames.size());
}
return descriptions;
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException("Failed to describe " + label + "s in parallel", ex);
} finally {
for (AdminClient client : clients) {
client.close();
}
}
}