sample/SubscribeEfCore/Program.cs (121 lines of code) (raw):
// Copyright © 2024 EPAM Systems
using Confluent.Kafka;
using Epam.Kafka;
using Epam.Kafka.PubSub;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;
using Epam.Kafka.PubSub.EntityFrameworkCore.Subscription;
using Epam.Kafka.PubSub.EntityFrameworkCore.Subscription.State;
using Epam.Kafka.PubSub.EntityFrameworkCore;
namespace SubscribeEfCore;
internal class Program
{
static void Main(string[] args)
{
IHostBuilder hostBuilder = Host.CreateDefaultBuilder(args);
hostBuilder.ConfigureServices((context, services) =>
{
// register db context for offsets storage
services.AddDbContext<SampleDbContext>(x => x.UseInMemoryDatabase("Sample"));
// and configure default external offsets storage implementation
services.TryAddKafkaDbContextState<SampleDbContext>();
KafkaBuilder builder = services.AddKafka();
// use mock cluster for demo purposes only, NOT for production!
builder.WithTestMockCluster("Sandbox");
// configure custom placeholders that can be used in config
builder.WithConfigPlaceholders($"<{nameof(context.HostingEnvironment.EnvironmentName)}>", context.HostingEnvironment.EnvironmentName);
// subscription summary health check with "live" tag
builder.WithSubscriptionSummaryHealthCheck(new[] { "live" });
// add subscription with default offsets storage managed by kafka broker
builder.AddSubscription<long, SampleKafkaEntity?, SubSample>(nameof(SubSample))
.WithHealthChecks()
.WithValueDeserializer(sr => new SampleKafkaEntitySerializer())
// store offsets in external storage (registered above) instead of Kafka internal storage.
// To keep using consumer group re-balancing use .WithSubscribeAndExternalOffsets() instead.
.WithAssignAndExternalOffsets()
// seed topic before processing
.WaitFor(sp =>
{
SampleKafkaEntitySerializer vs = new();
var e1 = new SampleKafkaEntity { Id = 1, Name = "N1" };
var e2 = new SampleKafkaEntity { Id = 1, Name = "N2" };
sp.GetRequiredKeyedService<TestMockCluster>("Sandbox").SeedTopic("sample.name",
new Message<byte[], byte[]?> { Key = Serializers.Int64.Serialize(e1.Id, SerializationContext.Empty), Value = vs.Serialize(e1, SerializationContext.Empty) },
new Message<byte[], byte[]?> { Key = Serializers.Int64.Serialize(e2.Id, SerializationContext.Empty), Value = vs.Serialize(e1, SerializationContext.Empty) },
new Message<byte[], byte[]?> { Key = Serializers.Int64.Serialize(e1.Id, SerializationContext.Empty), Value = null});
return Task.CompletedTask;
});
}).Build().Run();
}
}
public class SampleDbEntity
{
public long Id { get; init; }
public string Name { get; set; }
public int Partition { get; set; }
public long Offset { get; set; }
}
public class SampleKafkaEntity
{
public long Id { get; set; }
public string Name { get; set; }
}
public class SampleKafkaEntitySerializer : ISerializer<SampleKafkaEntity>, IDeserializer<SampleKafkaEntity?>
{
public byte[] Serialize(SampleKafkaEntity data, SerializationContext context)
{
return JsonSerializer.SerializeToUtf8Bytes(data);
}
public SampleKafkaEntity? Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
return isNull ? null : JsonSerializer.Deserialize<SampleKafkaEntity>(data);
}
}
public class SampleDbContext : DbContext, IKafkaStateDbContext
{
public SampleDbContext(DbContextOptions<SampleDbContext> options) : base(options)
{
}
public DbSet<SampleDbEntity> Entities => this.Set<SampleDbEntity>();
public DbSet<KafkaTopicState> KafkaTopicStates => this.Set<KafkaTopicState>();
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.AddKafkaState();
}
}
public class SubSample : DbContextEntitySubscriptionHandler<long, SampleKafkaEntity?, SampleDbContext, SampleDbEntity>
{
public SubSample(SampleDbContext context, ILogger<SubSample> logger) : base(context, logger)
{
}
protected override bool IsDeleted(ConsumeResult<long, SampleKafkaEntity?> value)
{
// thumbstone record
return value.Message.Value == null;
}
protected override void LoadMainChunk(IQueryable<SampleDbEntity> queryable, IReadOnlyCollection<ConsumeResult<long, SampleKafkaEntity?>> chunk)
{
long[] ids = chunk.Select(x => x.Message.Key).ToArray();
// load required entities from DB for further processing
queryable.Where(x => ids.Contains(x.Id)).Load();
}
protected override SampleDbEntity? FindLocal(DbSet<SampleDbEntity> dbSet, ConsumeResult<long, SampleKafkaEntity?> value)
{
// try to find existing DB entity by id
return dbSet.Find(value.Message.Key);
}
protected override string? Update(ConsumeResult<long, SampleKafkaEntity?> value, SampleDbEntity entity, bool created)
{
SampleKafkaEntity? v = value.Message.Value;
if (v != null)
{
entity.Name = v.Name;
entity.Partition = value.Partition;
entity.Offset = value.Offset;
}
return created ? "Created" : "Updated";
}
protected override bool TryCreate(ConsumeResult<long, SampleKafkaEntity?> value, out SampleDbEntity? entity)
{
entity = null;
if (value.Message.Value != null)
{
entity = new SampleDbEntity { Id = value.Message.Key };
}
return entity != null;
}
protected override void SaveChanges()
{
// optionally don't save changes in handler
// because offsets and data stored in same DB context
// so that data will be committed with offsets in single DB transaction
}
}