sample/SubscribeEfCoreOffsets/Program.cs (63 lines of code) (raw):

// Copyright © 2024 EPAM Systems using Confluent.Kafka; using Epam.Kafka; using Epam.Kafka.PubSub; using Epam.Kafka.PubSub.EntityFrameworkCore; using Epam.Kafka.PubSub.EntityFrameworkCore.Subscription.State; using Epam.Kafka.PubSub.Subscription; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace SubscribeEfCoreOffsets; internal class Program { static void Main(string[] args) { IHostBuilder hostBuilder = Host.CreateDefaultBuilder(args); hostBuilder.ConfigureServices((context, services) => { // register db context for offsets storage services.AddDbContext<OffsetsDbContext>(x => x.UseInMemoryDatabase("Sample")); // and configure default external offsets storage implementation services.TryAddKafkaDbContextState<OffsetsDbContext>(); KafkaBuilder builder = services.AddKafka(); // use mock cluster for demo purposes only, NOT for production! builder.WithTestMockCluster("Sandbox"); // configure custom placeholders that can be used in config builder.WithConfigPlaceholders($"<{nameof(context.HostingEnvironment.EnvironmentName)}>", context.HostingEnvironment.EnvironmentName); // subscription summary health check with "live" tag builder.WithSubscriptionSummaryHealthCheck(new[] { "live" }); // add subscription with default offsets storage managed by kafka broker builder.AddSubscription<Ignore, Ignore, SubSample>(nameof(SubSample)) .WithHealthChecks() // store offsets in external storage (registered above) instead of Kafka internal storage, however keep using consumer group re-balancing // use .WithAssignAndExternalOffsets() to assign constant topic partitions instead of group re-balancing .WithSubscribeAndExternalOffsets() // seed topic before processing .WaitFor(sp => { sp.GetRequiredKeyedService<TestMockCluster>("Sandbox").SeedTopic("sample.name", new Message<byte[], byte[]?> { Key = Guid.NewGuid().ToByteArray() }); return Task.CompletedTask; }); }).Build().Run(); } } public class SubSample : ISubscriptionHandler<Ignore, Ignore> { private readonly ILogger<SubSample> _logger; public SubSample(ILogger<SubSample> logger) { this._logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public void Execute(IReadOnlyCollection<ConsumeResult<Ignore, Ignore>> items, CancellationToken cancellationToken) { foreach (ConsumeResult<Ignore, Ignore> result in items) { this._logger.LogInformation("Processed {Tpo}", result.TopicPartitionOffset); } } } public class OffsetsDbContext : DbContext, IKafkaStateDbContext { public OffsetsDbContext(DbContextOptions<OffsetsDbContext> options) : base(options) { } public DbSet<KafkaTopicState> KafkaTopicStates => this.Set<KafkaTopicState>(); protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.AddKafkaState(); } }