protected override void ProcessBatch()

in src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/DbContextSubscriptionHandler.cs [98:140]


    protected override void ProcessBatch(IDictionary<ConsumeResult<TKey, TValue>, string?> items,
        CancellationToken cancellationToken)
    {
        if (items == null)
        {
            throw new ArgumentNullException(nameof(items));
        }

        var batch = new List<ConsumeResult<TKey, TValue>>(this.ChunkSize);

        using (var a = new Activity("LoadChunks"))
        {
            a.Start();

            for (int index = 0; ; index += this.ChunkSize)
            {
                batch.Clear();
                batch.AddRange(items.Where(x => x.Value == null).Skip(index).Take(this.ChunkSize).Select(x => x.Key));

                if (batch.Count > 0)
                {
                    this.LoadEntitiesChunk(batch);
                }
                else
                {
                    break;
                }
            }

            a.Stop();
        }

        base.ProcessBatch(items, cancellationToken);

        using (var a = new Activity("SaveChanges"))
        {
            a.Start();

            this.SaveChanges();

            a.Stop();
        }
    }