in src/main/java/com/epam/eco/commons/kafka/helpers/TopicRecordFetcher.java [455:490]
protected RecordFetchResult<K, V> toFetchResult(
Map<TopicPartition, ? extends RecordCollector> recordCollectors,
Map<TopicPartition, OffsetRange> offsetRanges,
Function<OffsetRange,Long> boundIfRecordsNotExists ) {
RecordFetchResult.Builder<K, V> builder = RecordFetchResult.builder();
for (Entry<TopicPartition, ? extends RecordCollector> entry : recordCollectors.entrySet()) {
TopicPartition partition = entry.getKey();
RecordCollector recordCollector = entry.getValue();
if(recordCollector.getSmallestScannedOffset()==-1 && recordCollector.getLargestScannedOffset()==-1) {
builder.result( PartitionRecordFetchResult.<K, V> builder()
.partition(partition)
.addRecords(Collections.emptyList())
.partitionOffsets(offsetRanges.get(partition))
.scannedOffsets( OffsetRange.with(
boundIfRecordsNotExists.apply(offsetRanges.get(partition)),
false,
boundIfRecordsNotExists.apply(offsetRanges.get(partition)),
false)).build());
} else {
builder.result(
PartitionRecordFetchResult.<K, V> builder()
.partition(partition)
.addRecords(recordCollector.getRecords())
.partitionOffsets(offsetRanges.get(partition))
.scannedOffsets(OffsetRange.with(
recordCollector.getSmallestScannedOffset(),
true,
recordCollector.getLargestScannedOffset(),
true)).build());
}
}
return builder.build();
}