public SubscriptionTopicWrapper()

in src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs [29:82]


    public SubscriptionTopicWrapper(IKafkaFactory kafkaFactory,
        SubscriptionMonitor monitor,
        SubscriptionOptions options,
        IDeserializer<TKey>? keyDeserializer,
        IDeserializer<TValue>? valueDeserializer,
        ILogger logger)
    {
        if (kafkaFactory == null)
        {
            throw new ArgumentNullException(nameof(kafkaFactory));
        }

        this.Monitor = monitor ?? throw new ArgumentNullException(nameof(monitor));
        this.Options = options ?? throw new ArgumentNullException(nameof(options));
        this.Logger = logger ?? throw new ArgumentNullException(nameof(logger));
        this._buffer = new List<ConsumeResult<TKey, TValue>>(options.BatchSize);

        ConsumerConfig config = kafkaFactory.CreateConsumerConfig(options.Consumer);

        config = config.Clone(this.Monitor.NamePlaceholder);

        this.ConfigureConsumerConfig(config);

        this._autoOffsetReset = config.AutoOffsetReset;
        this._consumeTimeoutMs = config.GetCancellationDelayMaxMs();
        this.ConsumerGroup = config.GroupId;

        if (!monitor.TryRegisterGroupId(config, this.Options, out string? msg))
        {
            var exception = new InvalidOperationException($"Unable to use '{config.GroupId}' group.id in '{monitor.Name}' subscription. {msg}");
            exception.DoNotRetryBatch();

            throw exception;
        }

        this.Consumer = kafkaFactory.CreateConsumer<TKey, TValue>(config, options.Cluster, b =>
        {
            if (keyDeserializer != null)
            {
                b.SetKeyDeserializer(keyDeserializer);
            }

            if (valueDeserializer != null)
            {
                b.SetValueDeserializer(valueDeserializer);
            }

            this.ConfigureConsumerBuilder(b);
        });

        // Warmup consumer to avoid potential issues with OAuth handler.
        // Consumer just created, so not assigned to any partition.
        this.Consumer.Consume(100);
    }