in src/main/java/com/epam/eco/commons/kafka/consumer/advanced/AdvancedConsumer.java [111:141]
public AdvancedConsumer(
Collection<String> topicNames,
Map<String, Object> consumerConfig,
Consumer<RecordBatchIterator<K, V>> handler,
Function<Map<String, Object>, KafkaConsumer<K, V>> consumerFactory) {
Validate.notNull(handler, "Handler is null");
Validate.notNull(consumerFactory, "Consumer factory is null");
subscribe(topicNames);
this.consumerConfig = ConsumerConfigBuilder.
with(consumerConfig).
minRequiredConfigs().
enableAutoCommitDisabled().
build();
this.groupId = (String)this.consumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG);
this.handler = handler;
var authExceptionRetryIntervalMs = (Long) this.consumerConfig.get(ConsumerConfigBuilder.AUTH_EXCEPTION_RETRY_INTERVAL_MS);
if (authExceptionRetryIntervalMs != null) {
this.authExceptionRetryInterval = Duration.ofMillis(authExceptionRetryIntervalMs);
}
threadName = buildThreadName();
handlerTaskExecutor = initHandlerTaskExecutor();
consumer = consumerFactory.apply(this.consumerConfig);
LOGGER.info("Initialized");
}