protected RecordFetchResult toFetchResult()

in src/main/java/com/epam/eco/commons/kafka/helpers/TopicRecordFetcher.java [455:490]


    protected RecordFetchResult<K, V> toFetchResult(
            Map<TopicPartition, ? extends RecordCollector> recordCollectors,
            Map<TopicPartition, OffsetRange> offsetRanges,
            Function<OffsetRange,Long> boundIfRecordsNotExists ) {

        RecordFetchResult.Builder<K, V> builder = RecordFetchResult.builder();

        for (Entry<TopicPartition, ? extends RecordCollector> entry : recordCollectors.entrySet()) {
            TopicPartition partition = entry.getKey();
            RecordCollector recordCollector = entry.getValue();

            if(recordCollector.getSmallestScannedOffset()==-1 && recordCollector.getLargestScannedOffset()==-1) {
                builder.result( PartitionRecordFetchResult.<K, V> builder()
                                     .partition(partition)
                                     .addRecords(Collections.emptyList())
                                     .partitionOffsets(offsetRanges.get(partition))
                                     .scannedOffsets( OffsetRange.with(
                                             boundIfRecordsNotExists.apply(offsetRanges.get(partition)),
                                             false,
                                             boundIfRecordsNotExists.apply(offsetRanges.get(partition)),
                                             false)).build());
            } else {
                builder.result(
                        PartitionRecordFetchResult.<K, V> builder()
                                     .partition(partition)
                                     .addRecords(recordCollector.getRecords())
                                     .partitionOffsets(offsetRanges.get(partition))
                                     .scannedOffsets(OffsetRange.with(
                                             recordCollector.getSmallestScannedOffset(),
                                             true,
                                             recordCollector.getLargestScannedOffset(),
                                             true)).build());
            }
        }
        return builder.build();
    }