in src/Epam.Kafka.PubSub/Publication/Topics/PublicationTopicWrapper.cs [19:80]
public PublicationTopicWrapper(
IKafkaFactory kafkaFactory,
PipelineMonitor monitor,
ProducerConfig config,
IPublicationTopicWrapperOptions options,
ILogger logger,
ISerializer<TKey>? keySerializer,
ISerializer<TValue>? valueSerializer)
{
this.Monitor = monitor ?? throw new ArgumentNullException(nameof(monitor));
this.Logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.Options = options ?? throw new ArgumentNullException(nameof(options));
this.RequireTransaction = config.TransactionalId != null;
if (this.RequireTransaction)
{
config.TransactionTimeoutMs ??= 60_000;
this.MinRemaining = TimeSpan.FromMilliseconds(config.TransactionTimeoutMs.Value * 2);
if (config.SocketTimeoutMs.HasValue &&
config.TransactionTimeoutMs.Value - config.SocketTimeoutMs.Value < 100)
{
config.SocketTimeoutMs = config.TransactionTimeoutMs - 1000;
}
if (!monitor.TryRegisterTransactionId(config, out string? existing))
{
var exception = new InvalidOperationException(
$"Unable to use '{config.TransactionalId}' transactional.id in '{monitor.Name}' publication because it already used by '{existing}'.");
exception.DoNotRetryBatch();
throw exception;
}
}
else
{
config.MessageTimeoutMs ??= 60_000;
this.MinRemaining = TimeSpan.FromMilliseconds(config.MessageTimeoutMs.Value * 2);
}
ConfigureReports(config);
this.Producer = kafkaFactory.CreateProducer<TKey, TValue>(config, this.Options.GetCluster(), b =>
{
options.GetPartitioner().Apply(b);
if (keySerializer != null)
{
b.SetKeySerializer(keySerializer);
}
if (valueSerializer != null)
{
b.SetValueSerializer(valueSerializer);
}
});
// warmup to avoid potential issues with OAuth handler
this.Producer.Poll(TimeSpan.Zero);
}