sample/ProduceAndConsume/Program.cs (97 lines of code) (raw):
// Copyright © 2024 EPAM Systems
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Epam.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace ProduceAndConsume;
internal class Program
{
static void Main(string[] args)
{
IHostBuilder hostBuilder = Host.CreateDefaultBuilder(args);
hostBuilder.ConfigureServices(services =>
{
KafkaBuilder kafkaBuilder = services.AddKafka();
// use mock cluster for demo purposes only, NOT for production!
kafkaBuilder.WithTestMockCluster("Sandbox");
// optionally append or override cluster settings configured in appsettings.json
kafkaBuilder.WithClusterConfig("Sandbox").Configure(x =>
{
x.ClientConfig.AllowAutoCreateTopics = true;
x.SchemaRegistryConfig.Url = "http://localhost:8080";
});
// optionally append or override consumer settings configured in appsettings.json
kafkaBuilder.WithConsumerConfig("c1").Configure(x =>
{
x.ConsumerConfig.GroupId = "test1";
});
// optionally append or override producer settings configured in appsettings.json
kafkaBuilder.WithProducerConfig("p1").Configure(x =>
{
x.ProducerConfig.EnableDeliveryReports = true;
});
// optionally configure default names for producer, consumer, and cluster configs
// so that you don't need to provide their names to IKafkaFactory methods
kafkaBuilder.WithDefaults(x =>
{
x.Producer = "p1";
x.Consumer = "c1";
x.Cluster = "Sandbox";
});
services.AddHostedService<ProduceSample>();
services.AddHostedService<ConsumeSample>();
}).Build().Run();
}
}
internal class ProduceSample : BackgroundService
{
private readonly IKafkaFactory _kafkaFactory;
private readonly ILogger<ProduceSample> _logger;
public ProduceSample(IKafkaFactory kafkaFactory, ILogger<ProduceSample> logger)
{
this._kafkaFactory = kafkaFactory ?? throw new ArgumentNullException(nameof(kafkaFactory));
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// create previously configured producer config by explicit name
var cfg = this._kafkaFactory.CreateProducerConfig("p1");
// or create previously configured producer config using previously configured default name
// var cfg = this._kafkaFactory.CreateProducerConfig();
// optionally refine config in runtime
cfg.Debug = "all";
// or create producer config without IKafkaFactory
//ProducerConfig cfg = new ProducerConfig { EnableDeliveryReports = true};
// Create producer instance using producer config and previously configured client config with explicit name
using var producer = this._kafkaFactory.CreateProducer<string, string>(cfg, "Sandbox", builder =>
{
// optionally setup producer builder
});
while (!stoppingToken.IsCancellationRequested)
{
var deliveryResult = await producer.ProduceAsync("test1",
new Message<string, string> { Key = "Test", Value = "Test" },
stoppingToken);
this._logger.LogInformation("{Status} {TopicPartitionOffset}", deliveryResult.Status, deliveryResult.TopicPartitionOffset);
await Task.Delay(3000, stoppingToken);
}
}
}
internal class ConsumeSample : BackgroundService
{
private readonly IKafkaFactory _kafkaFactory;
private readonly ILogger<ConsumeSample> _logger;
public ConsumeSample(IKafkaFactory kafkaFactory, ILogger<ConsumeSample> logger)
{
this._kafkaFactory = kafkaFactory ?? throw new ArgumentNullException(nameof(kafkaFactory));
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Delay(5000, stoppingToken);
// optionally create previously configured consumer config by explicit name
ConsumerConfig cfg = this._kafkaFactory.CreateConsumerConfig("c1");
// or create previously configured consumer config using previously configured default name
//ConsumerConfig cfg = this._kafkaFactory.CreateConsumerConfig();
// optionally refine config in runtime
cfg.EnableAutoCommit = true;
cfg.AutoOffsetReset = AutoOffsetReset.Earliest;
// or create consumer config without IKafkaFactory
// ConsumerConfig cfg = = new ConsumerConfig { GroupId = "test2" };
// get schema registry client for previously configured client config with explicit name
ISchemaRegistryClient sr = this._kafkaFactory.GetOrCreateSchemaRegistryClient("Sandbox");
// Create consumer instance using consumer config and previously configured client config with explicit name
using var consumer = this._kafkaFactory.CreateConsumer<string, string>(cfg, "Sandbox", builder =>
{
// optionally setup consumer builder
});
consumer.Subscribe("test1");
while (!stoppingToken.IsCancellationRequested)
{
ConsumeResult<string, string>? result = consumer.Consume(stoppingToken);
if (result != null)
{
this._logger.LogInformation("Consumed {TopicPartitionOffset}", result.TopicPartitionOffset);
}
}
}
}