in src/main/java/com/epam/eco/commons/kafka/helpers/GroupOffsetResetter.java [183:203]
private Map<TopicPartition, Long> filterOutUselessAndAdjustOutOfRangeOffsets(
Map<TopicPartition, Long> offsets,
Map<TopicPartition, OffsetRange> offsetRanges) {
Set<Entry<TopicPartition, Long>> entries = offsets.entrySet();
Map<TopicPartition, Long> resultOffsets = new HashMap<>((int) (entries.size() / 0.75));
for (Entry<TopicPartition, Long> entry : entries) {
TopicPartition partition = entry.getKey();
long offset = entry.getValue();
OffsetRange range = offsetRanges.get(partition);
if (range == null) {
continue;
}
offset = offset < range.getSmallest() ? range.getSmallest() : offset;
offset = offset > range.getLargest() + 1 ? range.getLargest() + 1 : offset;
resultOffsets.put(partition, offset);
}
return resultOffsets;
}