in src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/DbContextSubscriptionHandler.cs [98:140]
protected override void ProcessBatch(IDictionary<ConsumeResult<TKey, TValue>, string?> items,
CancellationToken cancellationToken)
{
if (items == null)
{
throw new ArgumentNullException(nameof(items));
}
var batch = new List<ConsumeResult<TKey, TValue>>(this.ChunkSize);
using (var a = new Activity("LoadChunks"))
{
a.Start();
for (int index = 0; ; index += this.ChunkSize)
{
batch.Clear();
batch.AddRange(items.Where(x => x.Value == null).Skip(index).Take(this.ChunkSize).Select(x => x.Key));
if (batch.Count > 0)
{
this.LoadEntitiesChunk(batch);
}
else
{
break;
}
}
a.Stop();
}
base.ProcessBatch(items, cancellationToken);
using (var a = new Activity("SaveChanges"))
{
a.Start();
this.SaveChanges();
a.Stop();
}
}