private void fetchOffsetsIfNeeded()

in src/main/java/com/epam/eco/commons/kafka/consumer/bootstrap/ConsumerGroupOffsetThresholdProvider.java [92:141]


    private void fetchOffsetsIfNeeded(KafkaConsumer<?, ?> consumer) {
        if (offsets != null) {
            return;
        }

        if (AdminClientUtils.consumerGroupExists(adminClientProperties, consumerGroup)) {
            List<TopicPartition> partitions = KafkaUtils.getTopicPartitionsAsList(consumer, topic);
            Map<TopicPartition, OffsetRange> partitionRanges = TopicOffsetRangeFetcher.
                    with(consumer).
                    fetchForPartitions(partitions);
            Map<TopicPartition, OffsetAndMetadata> partitionOffsets = AdminClientUtils.listConsumerGroupOffsets(
                    adminClientProperties,
                    consumerGroup,
                    partitions.toArray(new TopicPartition[0])
            );

            Map<TopicPartition, Long> result = new HashMap<>();
            // Fix obsolete offsets (out of range) and filter out empty partitions
            partitionOffsets.forEach((partition, meta) -> {
                OffsetRange range = partitionRanges.get(partition);
                if (meta != null && range != null) {
                    if (range.getSize() > 0) {
                        long offset = fixOffsetBoundaries(meta, range);
                        if (offset != -1) {
                            result.put(partition, offset);
                        }
                    } else {
                        log.debug(
                                "Consumer group offset '{}' for partition '{}' does not fit " +
                                        "partition range '{}'. Offset is ignored",
                                meta.offset(),
                                partition,
                                range
                        );
                    }
                }
            });

            log.info(
                    "Fetched consumer group '{}' offsets for topic '{}': {}",
                    consumerGroup,
                    topic,
                    result
            );
            offsets = result;
        } else {
            log.info("Consumer group '{}' does not exist. Thresholds are empty.", consumerGroup);
            offsets = Collections.emptyMap();
        }
    }