in src/Epam.Kafka.PubSub/Publication/PublicationBackgroundService.cs [175:212]
protected override void ExecuteBatch(
IPublicationTopicWrapper<TKey, TValue> topic,
IServiceProvider sp,
ActivityWrapper activitySpan,
CancellationToken cancellationToken)
{
IPublicationHandler<TKey, TValue> handler =
sp.ResolveRequiredService<IPublicationHandler<TKey, TValue>>(this.Options.HandlerType!);
ISyncPolicy handlerPolicy = this.Monitor.Context.GetHandlerPolicy(this.Options);
if (this.Options.HandlerConcurrencyGroup.HasValue)
{
this.Monitor.Batch.Update(BatchStatus.Queued);
}
try
{
handlerPolicy.Execute(ct => this.ExecuteBatchInternal(handler, topic, activitySpan, ct),
cancellationToken);
}
catch (Exception e1)
{
try
{
topic.AbortTransactionIfNeeded(activitySpan);
}
catch (Exception e2)
{
var exception = new AggregateException(e1, e2);
exception.DoNotRetryBatch();
throw exception;
}
throw;
}
}