sample/Subscribe/Program.cs (46 lines of code) (raw):

// Copyright © 2024 EPAM Systems using Confluent.Kafka; using Epam.Kafka; using Epam.Kafka.PubSub; using Epam.Kafka.PubSub.Subscription; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace Subscribe; internal class Program { static void Main(string[] args) { IHostBuilder hostBuilder = Host.CreateDefaultBuilder(args); hostBuilder.ConfigureServices((context, services) => { KafkaBuilder builder = services.AddKafka(); // use mock cluster for demo purposes only, NOT for production! builder.WithTestMockCluster("Sandbox"); // configure custom placeholders that can be used in config builder.WithConfigPlaceholders($"<{nameof(context.HostingEnvironment.EnvironmentName)}>", context.HostingEnvironment.EnvironmentName); // subscription summary health check with "live" tag builder.WithSubscriptionSummaryHealthCheck(new[] { "live" }); // add subscription with default offsets storage managed by kafka broker builder.AddSubscription<Ignore, Ignore, SubSample>(nameof(SubSample)) .WithHealthChecks() // seed topic before processing .WaitFor(sp => { sp.GetRequiredKeyedService<TestMockCluster>("Sandbox").SeedTopic("sample.name", new Message<byte[], byte[]?> { Key = Guid.NewGuid().ToByteArray() }); return Task.CompletedTask; }); }).Build().Run(); } } public class SubSample : ISubscriptionHandler<Ignore, Ignore> { private readonly ILogger<SubSample> _logger; public SubSample(ILogger<SubSample> logger) { this._logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public void Execute(IReadOnlyCollection<ConsumeResult<Ignore, Ignore>> items, CancellationToken cancellationToken) { foreach (ConsumeResult<Ignore, Ignore> result in items) { this._logger.LogInformation("Processed {Tpo}", result.TopicPartitionOffset); } } }