in src/main/java/com/epam/eco/commons/kafka/consumer/bootstrap/ConsumerGroupOffsetThresholdProvider.java [92:141]
private void fetchOffsetsIfNeeded(KafkaConsumer<?, ?> consumer) {
if (offsets != null) {
return;
}
if (AdminClientUtils.consumerGroupExists(adminClientProperties, consumerGroup)) {
List<TopicPartition> partitions = KafkaUtils.getTopicPartitionsAsList(consumer, topic);
Map<TopicPartition, OffsetRange> partitionRanges = TopicOffsetRangeFetcher.
with(consumer).
fetchForPartitions(partitions);
Map<TopicPartition, OffsetAndMetadata> partitionOffsets = AdminClientUtils.listConsumerGroupOffsets(
adminClientProperties,
consumerGroup,
partitions.toArray(new TopicPartition[0])
);
Map<TopicPartition, Long> result = new HashMap<>();
// Fix obsolete offsets (out of range) and filter out empty partitions
partitionOffsets.forEach((partition, meta) -> {
OffsetRange range = partitionRanges.get(partition);
if (meta != null && range != null) {
if (range.getSize() > 0) {
long offset = fixOffsetBoundaries(meta, range);
if (offset != -1) {
result.put(partition, offset);
}
} else {
log.debug(
"Consumer group offset '{}' for partition '{}' does not fit " +
"partition range '{}'. Offset is ignored",
meta.offset(),
partition,
range
);
}
}
});
log.info(
"Fetched consumer group '{}' offsets for topic '{}': {}",
consumerGroup,
topic,
result
);
offsets = result;
} else {
log.info("Consumer group '{}' does not exist. Thresholds are empty.", consumerGroup);
offsets = Collections.emptyMap();
}
}