private static Map describeInParallel()

in src/main/java/com/epam/eco/commons/kafka/AdminClientUtils.java [1176:1233]


    private static <D> Map<String, D> describeInParallel(
            Map<String, Object> clientConfig,
            String label,
            Collection<String> resourceNames,
            DescribeFunction<D> describeFunction) {
        label = StringUtils.isNotBlank(label) ? label : "resource";

        List<List<String>> resourceNamePartitions = ListUtils.partition(
                new ArrayList<>(resourceNames),
                PARALLELISM_THRESHOLD);

        int numClients = Math.min(
                Runtime.getRuntime().availableProcessors(),
                resourceNamePartitions.size());

        LOGGER.info("Initiating parallel description of {} {}s using {} clients", resourceNames.size(), label, numClients);

        List<AdminClient> clients = new ArrayList<>(numClients);
        try {
            for (int i = 0; i < numClients; i++) {
                clients.add(initClient(clientConfig));
            }

            Map<String, KafkaFuture<D>> futures = new HashMap<>();

            for (int i = 0; i < resourceNamePartitions.size(); i++) {
                List<String> resourceNamePartition = resourceNamePartitions.get(i);

                AdminClient client = clients.get(i % numClients);
                futures.putAll(describeFunction.apply(client, resourceNamePartition));
            }

            int describedCount = 0;

            Map<String, D> descriptions = new HashMap<>((int) (resourceNames.size() / 0.75));
            for (Entry<String, KafkaFuture<D>> entry : futures.entrySet()) {
                descriptions.put(entry.getKey(), entry.getValue().get());

                describedCount++;

                if (describedCount % PARALLELISM_THRESHOLD == 0) {
                    LOGGER.info("Described {} {}s out of {}", describedCount, label, resourceNames.size());
                }
            }

            if (describedCount % PARALLELISM_THRESHOLD != 0) {
                LOGGER.info("Described {} {}s out of {}", describedCount, label, resourceNames.size());
            }

            return descriptions;
        } catch (InterruptedException | ExecutionException ex) {
            throw new RuntimeException("Failed to describe " + label + "s in parallel", ex);
        } finally {
            for (AdminClient client : clients) {
                client.close();
            }
        }
    }