public PublicationTopicWrapper()

in src/Epam.Kafka.PubSub/Publication/Topics/PublicationTopicWrapper.cs [19:80]


    public PublicationTopicWrapper(
        IKafkaFactory kafkaFactory,
        PipelineMonitor monitor,
        ProducerConfig config,
        IPublicationTopicWrapperOptions options,
        ILogger logger,
        ISerializer<TKey>? keySerializer,
        ISerializer<TValue>? valueSerializer)
    {
        this.Monitor = monitor ?? throw new ArgumentNullException(nameof(monitor));
        this.Logger = logger ?? throw new ArgumentNullException(nameof(logger));
        this.Options = options ?? throw new ArgumentNullException(nameof(options));

        this.RequireTransaction = config.TransactionalId != null;

        if (this.RequireTransaction)
        {
            config.TransactionTimeoutMs ??= 60_000;

            this.MinRemaining = TimeSpan.FromMilliseconds(config.TransactionTimeoutMs.Value * 2);

            if (config.SocketTimeoutMs.HasValue &&
                config.TransactionTimeoutMs.Value - config.SocketTimeoutMs.Value < 100)
            {
                config.SocketTimeoutMs = config.TransactionTimeoutMs - 1000;
            }

            if (!monitor.TryRegisterTransactionId(config, out string? existing))
            {
                var exception = new InvalidOperationException(
                    $"Unable to use '{config.TransactionalId}' transactional.id in '{monitor.Name}' publication because it already used by '{existing}'.");
                exception.DoNotRetryBatch();

                throw exception;
            }
        }
        else
        {
            config.MessageTimeoutMs ??= 60_000;
            this.MinRemaining = TimeSpan.FromMilliseconds(config.MessageTimeoutMs.Value * 2);
        }

        ConfigureReports(config);

        this.Producer = kafkaFactory.CreateProducer<TKey, TValue>(config, this.Options.GetCluster(), b =>
        {
            options.GetPartitioner().Apply(b);

            if (keySerializer != null)
            {
                b.SetKeySerializer(keySerializer);
            }

            if (valueSerializer != null)
            {
                b.SetValueSerializer(valueSerializer);
            }
        });

        // warmup to avoid potential issues with OAuth handler
        this.Producer.Poll(TimeSpan.Zero);
    }