private R fetchBootstrap()

in src/main/java/com/epam/eco/commons/kafka/consumer/bootstrap/BootstrapConsumer.java [141:213]


    private R fetchBootstrap() {
        long bootstrapStartTs = System.currentTimeMillis();
        long bootstrapRecordCount = 0;

        LOGGER.info("Topic [{}]: starting bootstrap", topicName);

        try {
            Map<TopicPartition, Long> endOffsets = offsetThresholdProvider.getOffsetThreshold(
                    consumer,
                    partitions
            );
            if (endOffsets.isEmpty()) {
                LOGGER.info("Topic [{}]: finishing bootstrap, no records to fetch", topicName);
            } else {
                Map<TopicPartition, Long> consumedOffsets = new HashMap<>();
                long statusLogInterval = bootstrapTimeoutInMs / 100;
                long lastStatusLogTs = bootstrapStartTs;
                while (true) {
                    ConsumerRecords<K,V> records = consumer.poll(BOOTSTRAP_POLL_TIMEOUT);

                    long batchRecordCount = records.count();
                    if (batchRecordCount > 0) {
                        bootstrapRecordCount += batchRecordCount;

                        if (System.currentTimeMillis() - lastStatusLogTs > statusLogInterval) {
                            LOGGER.info(
                                    "Topic [{}]: {} bootstrap records fetched",
                                    topicName,
                                    bootstrapRecordCount);
                            lastStatusLogTs = System.currentTimeMillis();
                        }

                        recordCollector.collect(records);

                        consumedOffsets.putAll(KafkaUtils.getConsumerPositions(consumer));

                        if (compareOffsetsGreaterOrEqual(consumedOffsets, endOffsets)) {
                            LOGGER.info(
                                    "Topic [{}]: finishing bootstrap, received offsets have met expected threshold",
                                    topicName);
                            break;
                        }
                    }

                    if (System.currentTimeMillis() - bootstrapStartTs > bootstrapTimeoutInMs) {
                        if (failOnTimeout) {
                            throw new TimeoutException(
                                    "Topic [%s]: bootstrap timeout has exceeded".formatted(topicName)
                            );
                        }

                        LOGGER.info(
                                "Topic [{}]: finishing bootstrap, timeout has exceeded", topicName);
                        break;
                    }
                }
            }
        } catch (WakeupException wue) {
            LOGGER.warn("Topic [{}]: bootstrap aborted (woken up)", topicName);
        } catch (Exception ex) {
            throw new BootstrapException("Topic [%s]: bootstrap failed".formatted(topicName), ex);
        } finally {
            setBootstrapDone();
        }

        LOGGER.info(
                "Topic [{}]: bootstrap done in {}, {} records fetched",
                topicName,
                DurationFormatUtils.formatDurationHMS(System.currentTimeMillis() - bootstrapStartTs),
                bootstrapRecordCount);

        return recordCollector.result();
    }