public static long calculateConsumerLag()

in src/main/java/com/epam/eco/commons/kafka/KafkaUtils.java [280:298]


    public static long calculateConsumerLag(OffsetRange topicOffsetRange, long consumerOffset) {
        Validate.notNull(topicOffsetRange, "Offset range is null");
        Validate.isTrue(consumerOffset >= 0, "Consumer offset is invalid");

        if (consumerOffset < topicOffsetRange.getSmallest()) {
            return topicOffsetRange.getSize();
        }

        long largestConsumableOffset =
                topicOffsetRange.isLargestInclusive() ?
                topicOffsetRange.getLargest() + 1 :
                topicOffsetRange.getLargest();

        if (consumerOffset > largestConsumableOffset) {
            return -1;
        }

        return largestConsumableOffset - consumerOffset;
    }