public void run()

in src/main/java/com/epam/eco/commons/kafka/consumer/advanced/AdvancedConsumer.java [144:206]


    public void run() {
        Thread.currentThread().setName(threadName);

        try {
            while (running.get()) {
                try {
                    changeSubscriptionIfNeeded(() -> new ConsumerRebalanceListener() {
                        @Override
                        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                            LOGGER.debug("Group [{}]: partitions revoked = {}", groupId, partitions);

                            if (handlerTaskContainsAnyPartitions(partitions)) {
                                forceCompleteHandlerTask();
                            }

                            consumer.pause(partitions);
                        }
                        @Override
                        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                            LOGGER.debug("Group [{}]: partitions assigned = {}", groupId, partitions);

                            consumer.pause(partitions);
                        }
                    });

                    if (consumer.subscription().isEmpty()) {
                        continue;
                    }

                    if (handlerTaskExists()) {
                        consumer.pause(consumer.assignment());
                    } else {
                        consumer.resume(consumer.assignment());
                    }

                    ConsumerRecords<K, V> records = consumer.poll(POLL_TIMEOUT);
                    if (!records.isEmpty()) {
                        submitNewHandlerTask(records);
                    }
                } catch (AuthorizationException ae) {
                    if (authExceptionRetryInterval == null) {
                        LOGGER.error(String.format("Group [%s]: consumer failed due to Authorization Exception. " +
                                "No authExceptionRetryInterval, will not retry.", groupId), ae);
                        throw ae;
                    } else {
                        LOGGER.error(String.format("Group [%s]: consumer failed due to Authorization Exception. " +
                                "Will retry in %s ms", groupId, authExceptionRetryInterval.toMillis()), ae);
                        sleepFor(authExceptionRetryInterval);
                    }
                } finally {
                    completeHandlerTaskIfDone();
                }
            }
        } catch (WakeupException wue) {
            LOGGER.warn("Group [{}]: consumer aborted (woken up)", groupId);
        } catch (Exception ex) {
            LOGGER.error(String.format("Group [%s]: consumer failed", groupId), ex);
            throw ex;
        } finally {
            destroyHandlerTaskExecutor();
            shutdownLatch.countDown();
        }
    }