in src/main/java/com/epam/eco/commons/kafka/consumer/bootstrap/BootstrapConsumer.java [141:213]
private R fetchBootstrap() {
long bootstrapStartTs = System.currentTimeMillis();
long bootstrapRecordCount = 0;
LOGGER.info("Topic [{}]: starting bootstrap", topicName);
try {
Map<TopicPartition, Long> endOffsets = offsetThresholdProvider.getOffsetThreshold(
consumer,
partitions
);
if (endOffsets.isEmpty()) {
LOGGER.info("Topic [{}]: finishing bootstrap, no records to fetch", topicName);
} else {
Map<TopicPartition, Long> consumedOffsets = new HashMap<>();
long statusLogInterval = bootstrapTimeoutInMs / 100;
long lastStatusLogTs = bootstrapStartTs;
while (true) {
ConsumerRecords<K,V> records = consumer.poll(BOOTSTRAP_POLL_TIMEOUT);
long batchRecordCount = records.count();
if (batchRecordCount > 0) {
bootstrapRecordCount += batchRecordCount;
if (System.currentTimeMillis() - lastStatusLogTs > statusLogInterval) {
LOGGER.info(
"Topic [{}]: {} bootstrap records fetched",
topicName,
bootstrapRecordCount);
lastStatusLogTs = System.currentTimeMillis();
}
recordCollector.collect(records);
consumedOffsets.putAll(KafkaUtils.getConsumerPositions(consumer));
if (compareOffsetsGreaterOrEqual(consumedOffsets, endOffsets)) {
LOGGER.info(
"Topic [{}]: finishing bootstrap, received offsets have met expected threshold",
topicName);
break;
}
}
if (System.currentTimeMillis() - bootstrapStartTs > bootstrapTimeoutInMs) {
if (failOnTimeout) {
throw new TimeoutException(
"Topic [%s]: bootstrap timeout has exceeded".formatted(topicName)
);
}
LOGGER.info(
"Topic [{}]: finishing bootstrap, timeout has exceeded", topicName);
break;
}
}
}
} catch (WakeupException wue) {
LOGGER.warn("Topic [{}]: bootstrap aborted (woken up)", topicName);
} catch (Exception ex) {
throw new BootstrapException("Topic [%s]: bootstrap failed".formatted(topicName), ex);
} finally {
setBootstrapDone();
}
LOGGER.info(
"Topic [{}]: bootstrap done in {}, {} records fetched",
topicName,
DurationFormatUtils.formatDurationHMS(System.currentTimeMillis() - bootstrapStartTs),
bootstrapRecordCount);
return recordCollector.result();
}