in sample/SubscribeEfCore/Program.cs [23:68]
static void Main(string[] args)
{
IHostBuilder hostBuilder = Host.CreateDefaultBuilder(args);
hostBuilder.ConfigureServices((context, services) =>
{
// register db context for offsets storage
services.AddDbContext<SampleDbContext>(x => x.UseInMemoryDatabase("Sample"));
// and configure default external offsets storage implementation
services.TryAddKafkaDbContextState<SampleDbContext>();
KafkaBuilder builder = services.AddKafka();
// use mock cluster for demo purposes only, NOT for production!
builder.WithTestMockCluster("Sandbox");
// configure custom placeholders that can be used in config
builder.WithConfigPlaceholders($"<{nameof(context.HostingEnvironment.EnvironmentName)}>", context.HostingEnvironment.EnvironmentName);
// subscription summary health check with "live" tag
builder.WithSubscriptionSummaryHealthCheck(new[] { "live" });
// add subscription with default offsets storage managed by kafka broker
builder.AddSubscription<long, SampleKafkaEntity?, SubSample>(nameof(SubSample))
.WithHealthChecks()
.WithValueDeserializer(sr => new SampleKafkaEntitySerializer())
// store offsets in external storage (registered above) instead of Kafka internal storage.
// To keep using consumer group re-balancing use .WithSubscribeAndExternalOffsets() instead.
.WithAssignAndExternalOffsets()
// seed topic before processing
.WaitFor(sp =>
{
SampleKafkaEntitySerializer vs = new();
var e1 = new SampleKafkaEntity { Id = 1, Name = "N1" };
var e2 = new SampleKafkaEntity { Id = 1, Name = "N2" };
sp.GetRequiredKeyedService<TestMockCluster>("Sandbox").SeedTopic("sample.name",
new Message<byte[], byte[]?> { Key = Serializers.Int64.Serialize(e1.Id, SerializationContext.Empty), Value = vs.Serialize(e1, SerializationContext.Empty) },
new Message<byte[], byte[]?> { Key = Serializers.Int64.Serialize(e2.Id, SerializationContext.Empty), Value = vs.Serialize(e1, SerializationContext.Empty) },
new Message<byte[], byte[]?> { Key = Serializers.Int64.Serialize(e1.Id, SerializationContext.Empty), Value = null});
return Task.CompletedTask;
});
}).Build().Run();
}