private void ExecuteBatchInternal()

in src/Epam.Kafka.PubSub/Publication/PublicationBackgroundService.cs [69:131]


    private void ExecuteBatchInternal(
        IPublicationHandler<TKey, TValue> state,
        IPublicationTopicWrapper<TKey, TValue> topicWrapper,
        ActivityWrapper activitySpan,
        CancellationToken cancellationToken)
    {
        var stopwatch = Stopwatch.StartNew();

        IReadOnlyCollection<TopicMessage<TKey, TValue>> items = this.ReadItems(state, topicWrapper, activitySpan,
            cancellationToken);

        if (items.Count > 0)
        {
            cancellationToken.ThrowIfCancellationRequested();

            this.Logger.PubBatchBegin(this.Monitor.Name, items.Count);

            this.Monitor.Batch.Update(BatchStatus.Running);

            IDictionary<TopicMessage<TKey, TValue>, DeliveryReport> reports =
                topicWrapper.Produce(items, activitySpan, stopwatch, this.Options.HandlerTimeout, cancellationToken);

            this.Monitor.Batch.Update(BatchStatus.Commiting);

            using (activitySpan.CreateSpan("src_report"))
            {
                state.ReportResults(reports, topicWrapper.TransactionEnd, cancellationToken);
            }

            bool allItemsProcessed = this.VerifyResult(reports, items);

            if (topicWrapper.RequireTransaction)
            {
                if (allItemsProcessed)
                {
                    topicWrapper.CommitTransactionIfNeeded(activitySpan);

                    using (activitySpan.CreateSpan("src_commit"))
                    {
                        state.TransactionCommitted(cancellationToken);
                    }
                }
                else
                {
                    topicWrapper.AbortTransactionIfNeeded(activitySpan);
                }
            }

            this.Monitor.Result.Update(
                allItemsProcessed ? PublicationBatchResult.Processed : PublicationBatchResult.ProcessedPartial);

            this.Logger.BatchHandlerExecuted(items.Count,
                reports.Select(x => x.Value).GroupBy(x => $"{x.TopicPartition} {x.Status} {x.Error}")
                    .Select(g => new KeyValuePair<string, int>(g.Key, g.Count())),
                allItemsProcessed ? LogLevel.Information : LogLevel.Warning);
        }
        else
        {
            this.Logger.BatchEmpty(this.Monitor.Name);

            this.Monitor.Result.Update(PublicationBatchResult.Empty);
        }
    }