in src/main/java/com/epam/eco/commons/kafka/consumer/advanced/AdvancedConsumer.java [144:206]
public void run() {
Thread.currentThread().setName(threadName);
try {
while (running.get()) {
try {
changeSubscriptionIfNeeded(() -> new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
LOGGER.debug("Group [{}]: partitions revoked = {}", groupId, partitions);
if (handlerTaskContainsAnyPartitions(partitions)) {
forceCompleteHandlerTask();
}
consumer.pause(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
LOGGER.debug("Group [{}]: partitions assigned = {}", groupId, partitions);
consumer.pause(partitions);
}
});
if (consumer.subscription().isEmpty()) {
continue;
}
if (handlerTaskExists()) {
consumer.pause(consumer.assignment());
} else {
consumer.resume(consumer.assignment());
}
ConsumerRecords<K, V> records = consumer.poll(POLL_TIMEOUT);
if (!records.isEmpty()) {
submitNewHandlerTask(records);
}
} catch (AuthorizationException ae) {
if (authExceptionRetryInterval == null) {
LOGGER.error(String.format("Group [%s]: consumer failed due to Authorization Exception. " +
"No authExceptionRetryInterval, will not retry.", groupId), ae);
throw ae;
} else {
LOGGER.error(String.format("Group [%s]: consumer failed due to Authorization Exception. " +
"Will retry in %s ms", groupId, authExceptionRetryInterval.toMillis()), ae);
sleepFor(authExceptionRetryInterval);
}
} finally {
completeHandlerTaskIfDone();
}
}
} catch (WakeupException wue) {
LOGGER.warn("Group [{}]: consumer aborted (woken up)", groupId);
} catch (Exception ex) {
LOGGER.error(String.format("Group [%s]: consumer failed", groupId), ex);
throw ex;
} finally {
destroyHandlerTaskExecutor();
shutdownLatch.countDown();
}
}