in src/main/java/com/epam/eco/commons/kafka/helpers/TopicRecordFetcher.java [151:169]
public RecordFetchResult<K, V> fetchByOffsets(
String[] topicNames,
long offset,
long limit,
FilterClausePredicate<K,V> filter,
long timeoutInMs) {
Validate.notEmpty(topicNames, "Array of topic names is null or empty");
Validate.noNullElements(topicNames, "Array of topic names contains null elements");
Validate.isTrue(offset >= 0, "Offset is invalid");
Validate.isTrue(limit > 0, "Limit is invalid");
Validate.isTrue(timeoutInMs > 0, "Timeout is invalid");
try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig)) {
Map<TopicPartition, Long> offsets = toOffsets(
KafkaUtils.getTopicPartitionsAsList(consumer, topicNames),
offset);
return doFetchByOffsets(consumer, offsets, limit, filter, timeoutInMs);
}
}