public IConsumer CreateConsumer()

in src/Epam.Kafka/Internals/KafkaFactory.cs [105:172]


    public IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(ConsumerConfig config, string? cluster = null,
        Action<ConsumerBuilder<TKey, TValue>>? configure = null)
    {
        this.CheckIfDisposed();

        KafkaClusterOptions clusterOptions = this.GetAndValidateClusterOptions(cluster);

        Dictionary<string, string> resultConfig = MergeResultConfig(clusterOptions, config);

        config = new ConsumerConfig(resultConfig);

        // Init logger category from config and remove key because it is not standard key and cause errors.
        string logHandler = config.GetDotnetLoggerCategory();
        resultConfig.Remove(KafkaConfigExtensions.DotnetLoggerCategoryKey);

        var builder = new ConsumerBuilder<TKey, TValue>(config);

        configure?.Invoke(builder);

        bool oauthSet = false;
        bool logSet = false;

        try
        {
            builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
            logSet = true;
        }
        catch (InvalidOperationException)
        {
            // handler already set
        }

        if (clusterOptions is { OauthHandler: { }, ClientConfig.SaslMechanism: SaslMechanism.OAuthBearer })
        {
            try
            {
                builder.SetOAuthBearerTokenRefreshHandler(clusterOptions.OauthHandler.Invoke);
                oauthSet = true;
            }
            catch (InvalidOperationException)
            {
                // handler already set
                if (clusterOptions.OauthHandlerThrow)
                {
                    throw;
                }
            }
        }

        ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);

        ObservableConsumer<TKey, TValue> consumer;

        try
        {
            consumer = new ObservableConsumer<TKey, TValue>(builder);

            logger.ConsumerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
        }
        catch (Exception exc)
        {
            logger.ConsumerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);

            throw;
        }

        return consumer;
    }