public AdvancedConsumer()

in src/main/java/com/epam/eco/commons/kafka/consumer/advanced/AdvancedConsumer.java [111:141]


    public AdvancedConsumer(
            Collection<String> topicNames,
            Map<String, Object> consumerConfig,
            Consumer<RecordBatchIterator<K, V>> handler,
            Function<Map<String, Object>, KafkaConsumer<K, V>> consumerFactory) {
        Validate.notNull(handler, "Handler is null");
        Validate.notNull(consumerFactory, "Consumer factory is null");

        subscribe(topicNames);

        this.consumerConfig = ConsumerConfigBuilder.
                with(consumerConfig).
                minRequiredConfigs().
                enableAutoCommitDisabled().
                build();
        this.groupId = (String)this.consumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG);
        this.handler = handler;

        var authExceptionRetryIntervalMs = (Long) this.consumerConfig.get(ConsumerConfigBuilder.AUTH_EXCEPTION_RETRY_INTERVAL_MS);
        if (authExceptionRetryIntervalMs != null) {
            this.authExceptionRetryInterval = Duration.ofMillis(authExceptionRetryIntervalMs);
        }

        threadName = buildThreadName();

        handlerTaskExecutor = initHandlerTaskExecutor();

        consumer = consumerFactory.apply(this.consumerConfig);

        LOGGER.info("Initialized");
    }