in src/Epam.Kafka/Internals/KafkaFactory.cs [174:241]
public IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerConfig config, string? cluster = null,
Action<ProducerBuilder<TKey, TValue>>? configure = null)
{
this.CheckIfDisposed();
KafkaClusterOptions clusterOptions = this.GetAndValidateClusterOptions(cluster);
Dictionary<string, string> resultConfig = MergeResultConfig(clusterOptions, config);
config = new ProducerConfig(resultConfig);
// Init logger category from config and remove key because it is not standard key and cause errors.
string logHandler = config.GetDotnetLoggerCategory();
resultConfig.Remove(KafkaConfigExtensions.DotnetLoggerCategoryKey);
ProducerBuilder<TKey, TValue> builder = new(config);
configure?.Invoke(builder);
bool oauthSet = false;
bool logSet = false;
try
{
builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
logSet = true;
}
catch (InvalidOperationException)
{
// handler already set
}
if (clusterOptions is { OauthHandler: { }, ClientConfig.SaslMechanism: SaslMechanism.OAuthBearer })
{
try
{
builder.SetOAuthBearerTokenRefreshHandler(clusterOptions.OauthHandler);
oauthSet = true;
}
catch (InvalidOperationException)
{
// handler already set
if (clusterOptions.OauthHandlerThrow)
{
throw;
}
}
}
ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);
ObservableProducer<TKey, TValue> producer;
try
{
producer = new(builder);
logger.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
}
catch (Exception exc)
{
logger.ProducerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
throw;
}
return producer;
}