in src/main/java/com/epam/eco/commons/kafka/consumer/bootstrap/BootstrapConsumer.java [232:252]
private void assignAndInitPartitionsIfNeeded() {
if (partitions != null) { // already done
return;
}
List<TopicPartition> partitionsAll = KafkaUtils.getTopicPartitionsAsList(consumer, topicName);
if (instanceCount > partitionsAll.size()) {
throw new RuntimeException(
String.format(
"Instance count %d is larger than actual number of topic [%s] partitions %d",
instanceCount, topicName, partitionsAll.size()));
}
partitions = partitionsAll.
stream().
filter(partition -> partition.partition() % instanceCount == instanceIndex).
collect(Collectors.toSet());
consumer.assign(partitions);
offsetInitializer.init(consumer, partitions);
}