in src/main/java/com/epam/eco/commons/kafka/consumer/bootstrap/BootstrapConsumer.java [70:109]
private BootstrapConsumer(
String topicName,
Map<String, Object> consumerConfig,
OffsetInitializer offsetInitializer,
OffsetThresholdProvider offsetThresholdProvider,
long bootstrapTimeoutInMs,
RecordCollector<K, V, R> recordCollector,
int instanceCount,
int instanceIndex,
boolean failOnTimeout
) {
Validate.notBlank(topicName, "Topic name is blank");
Validate.notNull(offsetInitializer, "Offset Initializer is null");
Validate.notNull(offsetThresholdProvider, "Offset threshold provider is null");
Validate.isTrue(bootstrapTimeoutInMs > 0, "Bootstrap timeout is invalid");
Validate.notNull(recordCollector, "Record Collector is null");
Validate.isTrue(instanceCount > 0, "Instance count is invalid");
Validate.isTrue(
instanceIndex >= 0 && instanceIndex < instanceCount,
"Instance index is invalid");
this.topicName = topicName;
this.consumerConfig = ConsumerConfigBuilder.
with(consumerConfig).
minRequiredConfigs().
enableAutoCommitDisabled().
autoOffsetResetEarliest().
build();
this.offsetInitializer = offsetInitializer;
this.offsetThresholdProvider = offsetThresholdProvider;
this.bootstrapTimeoutInMs = bootstrapTimeoutInMs;
this.recordCollector = recordCollector;
this.instanceCount = instanceCount;
this.instanceIndex = instanceIndex;
this.failOnTimeout = failOnTimeout;
consumer = new KafkaConsumer<>(this.consumerConfig);
LOGGER.info("Initialized");
}