protected override void ExecuteBatch()

in src/Epam.Kafka.PubSub/Subscription/SubscriptionBackgroundService.cs [99:168]


    protected override void ExecuteBatch(
        SubscriptionTopicWrapper<TKey, TValue> topic,
        IServiceProvider sp,
        ActivityWrapper activitySpan,
        CancellationToken cancellationToken)
    {
        BatchState state = sp.ResolveRequiredService<BatchState>(this.Options.StateType);

        this.Monitor.Batch.Update(BatchStatus.Reading);

        bool unassignedBeforeRead = state.GetBatch(
            topic, activitySpan, out IReadOnlyCollection<ConsumeResult<TKey, TValue>> batch, cancellationToken);

        cancellationToken.ThrowIfCancellationRequested();

        if (this.AdaptiveBatchSize.HasValue)
        {
            batch = batch.Take(this.AdaptiveBatchSize.Value).ToList();
        }
        else
        {
            this.AdaptiveBatchSize = batch.Count;
        }

        if (batch.Count > 0)
        {
            batch.GetOffsetsRange(
                out IDictionary<TopicPartition, Offset>? from,
                out IDictionary<TopicPartition, Offset>? to);

            this.Logger.SubBatchBegin(this.Monitor.Name, batch.Count,
                from.Select(x => new TopicPartitionOffset(x.Key, x.Value)),
                to.Select(x => new TopicPartitionOffset(x.Key, x.Value)));

            this.CreateAndExecuteHandler(sp, topic, batch, activitySpan, cancellationToken);

            this.Monitor.Batch.Update(BatchStatus.Commiting);

            state.CommitResults(topic, activitySpan,
                // offset of processed message + 1
                to.PrepareOffsetsToCommit(), cancellationToken);

            this.Monitor.Result.Update(SubscriptionBatchResult.Processed);
        }
        else
        {
            // get new instance of assignment list
            List<TopicPartition> assignments = topic.Consumer.Assignment;

            if (assignments.Count > 0 &&
                assignments.All(x => topic.TryGetOffset(x, out Offset offset) && offset == ExternalOffset.Paused))
            {
                this.Monitor.Result.Update(SubscriptionBatchResult.Paused);

                this.Logger.ConsumerPaused(this.Monitor.Name, assignments);
            }
            else if (assignments.Count == 0 || unassignedBeforeRead)
            {
                this.Monitor.Result.Update(SubscriptionBatchResult.NotAssigned);

                this.Logger.ConsumerNotAssigned(this.Monitor.Name);
            }
            else
            {
                this.Logger.BatchEmpty(this.Monitor.Name);

                this.Monitor.Result.Update(SubscriptionBatchResult.Empty);
            }
        }
    }