in src/main/java/com/epam/eco/commons/kafka/consumer/bootstrap/TimestampOffsetInitializer.java [59:76]
private Map<TopicPartition, Long> getOffsetsForTimestamp(
Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
Map<TopicPartition, OffsetAndTimestamp> offsetsAndTimestamps = consumer.offsetsForTimes(
partitions.stream().collect(
Collectors.toMap(
Function.identity(),
partition -> timestamp)));
Map<TopicPartition, Long> offsets = new HashMap<>();
offsetsAndTimestamps.forEach((topicPartition, offsetAndTimestamp) -> {
offsets.put(
topicPartition,
offsetAndTimestamp != null ? offsetAndTimestamp.offset() : null);
});
return offsets;
}