in src/main/java/com/epam/eco/commons/kafka/KafkaUtils.java [197:213]
public static Map<TopicPartition, Long> extractLargestOffsets(ConsumerRecords<?, ?> records) {
Validate.notNull(records, "ConsumerRecords object is null");
Set<TopicPartition> partitions = records.partitions();
Map<TopicPartition, Long> largestOffsets = new HashMap<>((int) (partitions.size() / 0.75));
for (TopicPartition partition : partitions) {
List<ConsumerRecord<?, ?>> recordsPerPartition =
((ConsumerRecords)records).records(partition);
if (recordsPerPartition.isEmpty()) {
continue;
}
ConsumerRecord<?, ?> lastRecordInPartition =
recordsPerPartition.get(recordsPerPartition.size() - 1);
largestOffsets.put(partition, lastRecordInPartition.offset());
}
return largestOffsets;
}