in src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs [29:82]
public SubscriptionTopicWrapper(IKafkaFactory kafkaFactory,
SubscriptionMonitor monitor,
SubscriptionOptions options,
IDeserializer<TKey>? keyDeserializer,
IDeserializer<TValue>? valueDeserializer,
ILogger logger)
{
if (kafkaFactory == null)
{
throw new ArgumentNullException(nameof(kafkaFactory));
}
this.Monitor = monitor ?? throw new ArgumentNullException(nameof(monitor));
this.Options = options ?? throw new ArgumentNullException(nameof(options));
this.Logger = logger ?? throw new ArgumentNullException(nameof(logger));
this._buffer = new List<ConsumeResult<TKey, TValue>>(options.BatchSize);
ConsumerConfig config = kafkaFactory.CreateConsumerConfig(options.Consumer);
config = config.Clone(this.Monitor.NamePlaceholder);
this.ConfigureConsumerConfig(config);
this._autoOffsetReset = config.AutoOffsetReset;
this._consumeTimeoutMs = config.GetCancellationDelayMaxMs();
this.ConsumerGroup = config.GroupId;
if (!monitor.TryRegisterGroupId(config, this.Options, out string? msg))
{
var exception = new InvalidOperationException($"Unable to use '{config.GroupId}' group.id in '{monitor.Name}' subscription. {msg}");
exception.DoNotRetryBatch();
throw exception;
}
this.Consumer = kafkaFactory.CreateConsumer<TKey, TValue>(config, options.Cluster, b =>
{
if (keyDeserializer != null)
{
b.SetKeyDeserializer(keyDeserializer);
}
if (valueDeserializer != null)
{
b.SetValueDeserializer(valueDeserializer);
}
this.ConfigureConsumerBuilder(b);
});
// Warmup consumer to avoid potential issues with OAuth handler.
// Consumer just created, so not assigned to any partition.
this.Consumer.Consume(100);
}