in src/main/java/com/epam/eco/commons/kafka/helpers/GroupOffsetResetter.java [146:177]
private void doReset(Consumer<?, ?> consumer, Map<TopicPartition, Long> offsets) {
Map<TopicPartition, OffsetRange> offsetRanges = fetchOffsetRanges(offsets.keySet());
offsets = filterOutUselessAndAdjustOutOfRangeOffsets(offsets, offsetRanges);
if (offsets.isEmpty()) {
return;
}
consumer.assign(new ArrayList<>(offsets.keySet()));
Map<TopicPartition, OffsetAndMetadata> offsetsCommitted = Collections.emptyMap();
try {
offsetsCommitted = consumer.committed(offsets.keySet());
} catch (Exception e) {
// ignore
}
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (Entry<TopicPartition, Long> entry : offsets.entrySet()) {
TopicPartition partition = entry.getKey();
Long offset = entry.getValue();
OffsetAndMetadata meta = offsetsCommitted.get(partition);
if (meta != null) {
offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
}
}
if (!offsetsToCommit.isEmpty()) {
consumer.commitSync(offsetsToCommit);
}
}