in src/Epam.Kafka/Internals/KafkaFactory.cs [105:172]
public IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(ConsumerConfig config, string? cluster = null,
Action<ConsumerBuilder<TKey, TValue>>? configure = null)
{
this.CheckIfDisposed();
KafkaClusterOptions clusterOptions = this.GetAndValidateClusterOptions(cluster);
Dictionary<string, string> resultConfig = MergeResultConfig(clusterOptions, config);
config = new ConsumerConfig(resultConfig);
// Init logger category from config and remove key because it is not standard key and cause errors.
string logHandler = config.GetDotnetLoggerCategory();
resultConfig.Remove(KafkaConfigExtensions.DotnetLoggerCategoryKey);
var builder = new ConsumerBuilder<TKey, TValue>(config);
configure?.Invoke(builder);
bool oauthSet = false;
bool logSet = false;
try
{
builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
logSet = true;
}
catch (InvalidOperationException)
{
// handler already set
}
if (clusterOptions is { OauthHandler: { }, ClientConfig.SaslMechanism: SaslMechanism.OAuthBearer })
{
try
{
builder.SetOAuthBearerTokenRefreshHandler(clusterOptions.OauthHandler.Invoke);
oauthSet = true;
}
catch (InvalidOperationException)
{
// handler already set
if (clusterOptions.OauthHandlerThrow)
{
throw;
}
}
}
ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);
ObservableConsumer<TKey, TValue> consumer;
try
{
consumer = new ObservableConsumer<TKey, TValue>(builder);
logger.ConsumerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
}
catch (Exception exc)
{
logger.ConsumerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
throw;
}
return consumer;
}