in src/main/java/com/epam/eco/commons/kafka/consumer/bootstrap/BootstrapConsumerThread.java [56:73]
public void run() {
try {
while (running.get()) {
handler.accept(consumer.fetch());
bootstrapLatch.countDown();
}
} catch (Exception e) {
BootstrapException wrapped = toBootstrapException(e);
log.error(wrapped.getMessage(), wrapped);
exception.set(wrapped);
// Trigger latch to rethrow exception immediately
bootstrapLatch.countDown();
} finally {
running.set(false);
consumer.close();
shutdownLatch.countDown();
}
}