in src/Epam.Kafka.PubSub/Publication/PublicationBackgroundService.cs [69:131]
private void ExecuteBatchInternal(
IPublicationHandler<TKey, TValue> state,
IPublicationTopicWrapper<TKey, TValue> topicWrapper,
ActivityWrapper activitySpan,
CancellationToken cancellationToken)
{
var stopwatch = Stopwatch.StartNew();
IReadOnlyCollection<TopicMessage<TKey, TValue>> items = this.ReadItems(state, topicWrapper, activitySpan,
cancellationToken);
if (items.Count > 0)
{
cancellationToken.ThrowIfCancellationRequested();
this.Logger.PubBatchBegin(this.Monitor.Name, items.Count);
this.Monitor.Batch.Update(BatchStatus.Running);
IDictionary<TopicMessage<TKey, TValue>, DeliveryReport> reports =
topicWrapper.Produce(items, activitySpan, stopwatch, this.Options.HandlerTimeout, cancellationToken);
this.Monitor.Batch.Update(BatchStatus.Commiting);
using (activitySpan.CreateSpan("src_report"))
{
state.ReportResults(reports, topicWrapper.TransactionEnd, cancellationToken);
}
bool allItemsProcessed = this.VerifyResult(reports, items);
if (topicWrapper.RequireTransaction)
{
if (allItemsProcessed)
{
topicWrapper.CommitTransactionIfNeeded(activitySpan);
using (activitySpan.CreateSpan("src_commit"))
{
state.TransactionCommitted(cancellationToken);
}
}
else
{
topicWrapper.AbortTransactionIfNeeded(activitySpan);
}
}
this.Monitor.Result.Update(
allItemsProcessed ? PublicationBatchResult.Processed : PublicationBatchResult.ProcessedPartial);
this.Logger.BatchHandlerExecuted(items.Count,
reports.Select(x => x.Value).GroupBy(x => $"{x.TopicPartition} {x.Status} {x.Error}")
.Select(g => new KeyValuePair<string, int>(g.Key, g.Count())),
allItemsProcessed ? LogLevel.Information : LogLevel.Warning);
}
else
{
this.Logger.BatchEmpty(this.Monitor.Name);
this.Monitor.Result.Update(PublicationBatchResult.Empty);
}
}