in src/main/java/com/epam/eco/commons/kafka/helpers/TopicRecordFetcher.java [408:432]
protected Map<TopicPartition, Long> calculateLimitsByPartition(
Collection<TopicPartition> partitions,
long limit) {
if(partitions.isEmpty()) {
return new HashMap<>();
}
Map<TopicPartition, Long> limits = new TreeMap<>(TopicPartitionComparator.INSTANCE);
long limitEven = limit / partitions.size();
long limitOdd = limit % partitions.size();
for (TopicPartition partition : partitions) {
long oddPerPartition = 0;
if (limitOdd > 0) {
oddPerPartition = 1;
limitOdd--;
}
long limitPerPartition = limitEven + oddPerPartition;
if (limitPerPartition <= 0) {
break;
}
limits.put( partition, limitPerPartition);
}
return limits;
}