public IProducer CreateProducer()

in src/Epam.Kafka/Internals/KafkaFactory.cs [174:241]


    public IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerConfig config, string? cluster = null,
        Action<ProducerBuilder<TKey, TValue>>? configure = null)
    {
        this.CheckIfDisposed();

        KafkaClusterOptions clusterOptions = this.GetAndValidateClusterOptions(cluster);

        Dictionary<string, string> resultConfig = MergeResultConfig(clusterOptions, config);

        config = new ProducerConfig(resultConfig);

        // Init logger category from config and remove key because it is not standard key and cause errors.
        string logHandler = config.GetDotnetLoggerCategory();
        resultConfig.Remove(KafkaConfigExtensions.DotnetLoggerCategoryKey);

        ProducerBuilder<TKey, TValue> builder = new(config);

        configure?.Invoke(builder);

        bool oauthSet = false;
        bool logSet = false;

        try
        {
            builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
            logSet = true;
        }
        catch (InvalidOperationException)
        {
            // handler already set
        }

        if (clusterOptions is { OauthHandler: { }, ClientConfig.SaslMechanism: SaslMechanism.OAuthBearer })
        {
            try
            {
                builder.SetOAuthBearerTokenRefreshHandler(clusterOptions.OauthHandler);
                oauthSet = true;
            }
            catch (InvalidOperationException)
            {
                // handler already set
                if (clusterOptions.OauthHandlerThrow)
                {
                    throw;
                }
            }
        }

        ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);

        ObservableProducer<TKey, TValue> producer;

        try
        {
            producer = new(builder);

            logger.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
        }
        catch (Exception exc)
        {
            logger.ProducerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);

            throw;
        }

        return producer;
    }