in src/main/java/com/epam/eco/commons/kafka/KafkaUtils.java [280:298]
public static long calculateConsumerLag(OffsetRange topicOffsetRange, long consumerOffset) {
Validate.notNull(topicOffsetRange, "Offset range is null");
Validate.isTrue(consumerOffset >= 0, "Consumer offset is invalid");
if (consumerOffset < topicOffsetRange.getSmallest()) {
return topicOffsetRange.getSize();
}
long largestConsumableOffset =
topicOffsetRange.isLargestInclusive() ?
topicOffsetRange.getLargest() + 1 :
topicOffsetRange.getLargest();
if (consumerOffset > largestConsumableOffset) {
return -1;
}
return largestConsumableOffset - consumerOffset;
}