in src/Epam.Kafka/Internals/KafkaFactory.cs [265:300]
public ISchemaRegistryClient GetOrCreateSchemaRegistryClient(string? cluster = null)
{
this.CheckIfDisposed();
KafkaClusterOptions clusterOptions = this.GetAndValidateClusterOptions(cluster);
if (!this._registries.TryGetValue(clusterOptions, out CachedSchemaRegistryClient? result))
{
ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);
lock (this._syncObj)
{
if (!this._registries.TryGetValue(clusterOptions, out result))
{
try
{
result = new CachedSchemaRegistryClient(clusterOptions.SchemaRegistryConfig,
clusterOptions.AuthenticationHeaderValueProvider);
this._registries.Add(clusterOptions, result);
logger.RegistryClientCreateOk(PrepareConfigForLogs(clusterOptions.SchemaRegistryConfig),
clusterOptions.AuthenticationHeaderValueProvider?.GetType());
}
catch (Exception exception)
{
logger.RegistryClientCreateError(exception, PrepareConfigForLogs(clusterOptions.SchemaRegistryConfig),
clusterOptions.AuthenticationHeaderValueProvider?.GetType());
throw;
}
}
}
}
return result;
}