sample/PublishEfCore/Program.cs (80 lines of code) (raw):

// Copyright © 2024 EPAM Systems using System.Text.Json; using Confluent.Kafka; using Epam.Kafka; using Epam.Kafka.PubSub; using Epam.Kafka.PubSub.EntityFrameworkCore.Publication.Contracts; using Epam.Kafka.PubSub.Publication; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace PublishEfCore; internal class Program { static void Main(string[] args) { IHostBuilder hostBuilder = Host.CreateDefaultBuilder(args); hostBuilder.ConfigureServices((context, services) => { services.AddDbContext<SampleDbContext>(x => x.UseInMemoryDatabase("sample")); KafkaBuilder builder = services.AddKafka(); // use mock cluster for demo purposes only, NOT for production! builder.WithTestMockCluster("Sandbox"); // configure custom placeholders that can be used in config builder.WithConfigPlaceholders($"<{nameof(context.HostingEnvironment.EnvironmentName)}>", context.HostingEnvironment.EnvironmentName); // publication summary health check with "live" tag builder.WithPublicationSummaryHealthCheck(new[] { "live" }); // Read data from DB using Entity framework core, // convert to message, // publish message to kafka, // and finally update row state in database builder.AddPublication<long, SampleKafkaEntity, PubSample>(nameof(PubSample)) .WithHealthChecks() .WithValueSerializer(sr => new SampleKafkaEntitySerializer()) // seed database before processing .WaitFor(sp => { SampleDbContext db = sp.GetRequiredService<SampleDbContext>(); db.Entities.Add(new SampleDbEntity { Id = 1, Name = "qwe", KafkaPubState = KafkaPublicationState.Queued }); return db.SaveChangesAsync(); }); }).Build().Run(); } } public class SampleDbEntity : IKafkaPublicationEntity { public int Id { get; set; } public string Name { get; set; } public KafkaPublicationState KafkaPubState { get; set; } public DateTime KafkaPubNbf { get; set; } } public class SampleKafkaEntity { public long Id { get; set; } public string Name { get; set; } } public class SampleKafkaEntitySerializer : ISerializer<SampleKafkaEntity> { public byte[] Serialize(SampleKafkaEntity data, SerializationContext context) { return JsonSerializer.SerializeToUtf8Bytes(data); } } public class SampleDbContext : DbContext { public SampleDbContext(DbContextOptions<SampleDbContext> options) : base(options) { } public DbSet<SampleDbEntity> Entities => this.Set<SampleDbEntity>(); } public class PubSample : DbContextEntityPublicationHandler<long, SampleKafkaEntity, SampleDbEntity, SampleDbContext> { public PubSample(SampleDbContext context, ILogger<PubSample> logger) : base(context, logger) { } protected override IEnumerable<TopicMessage<long, SampleKafkaEntity>> Convert(SampleDbEntity dbEntity) { yield return new TopicMessage<long, SampleKafkaEntity> { Key = dbEntity.Id, Value = new SampleKafkaEntity { Id = dbEntity.Id, Name = dbEntity.Name } }; } protected override IOrderedQueryable<SampleDbEntity> OrderBy(IQueryable<SampleDbEntity> query) { return query.OrderBy(x => x.Id); } }