private void assignAndInitPartitionsIfNeeded()

in src/main/java/com/epam/eco/commons/kafka/consumer/bootstrap/BootstrapConsumer.java [232:252]


    private void assignAndInitPartitionsIfNeeded() {
        if (partitions != null) { // already done
            return;
        }

        List<TopicPartition> partitionsAll = KafkaUtils.getTopicPartitionsAsList(consumer, topicName);
        if (instanceCount > partitionsAll.size()) {
            throw new RuntimeException(
                    String.format(
                            "Instance count %d is larger than actual number of topic [%s] partitions %d",
                            instanceCount, topicName, partitionsAll.size()));
        }

        partitions = partitionsAll.
                stream().
                filter(partition -> partition.partition() % instanceCount == instanceIndex).
                collect(Collectors.toSet());

        consumer.assign(partitions);
        offsetInitializer.init(consumer, partitions);
    }