in src/main/java/com/epam/eco/commons/kafka/consumer/bootstrap/ConsumerGroupOffsetThresholdProvider.java [66:90]
public Map<TopicPartition, Long> getOffsetThreshold(
KafkaConsumer<?, ?> consumer,
Collection<TopicPartition> partitions
) {
fetchOffsetsIfNeeded(consumer);
Map<TopicPartition, Long> result = new HashMap<>();
partitions.forEach(partition -> {
if (!topic.equals(partition.topic())) {
throw new IllegalArgumentException(
"Partition '%s' is not part of topic '%s'.".formatted(
topic,
partition
)
);
}
Long offset = offsets.get(partition);
if (offset != null) {
result.put(partition, offset);
}
});
return result;
}