in src/Epam.Kafka.PubSub/Common/PubSubBackgroundService.cs [219:279]
private TBatchResult ExecuteBatchWrapper(
ISyncPolicy<TBatchResult> batchPolicy,
TTopic topicWrapper,
CancellationToken stoppingToken)
{
return batchPolicy.Execute(ct =>
{
using ActivityWrapper transaction = new(this._diagnosticListener, this.Monitor.FullName);
IServiceScope serviceScope = this._serviceScopeFactory.CreateScope();
var exceptions = new List<Exception>(2);
try
{
IServiceProvider sp = serviceScope.ServiceProvider;
this.ExecuteBatch(topicWrapper, sp, transaction, ct);
this.AdaptiveBatchSize = null;
transaction.SetResult(this.Monitor.Result.Value);
return this.Monitor.Result.Value;
}
catch (Exception exception)
{
transaction.SetResult(exception);
exceptions.Add(exception);
throw;
}
finally
{
try
{
serviceScope.Dispose();
}
catch (Exception e)
{
#pragma warning disable CA2219 // Required to not lost information about neither batch exception nor exception from serviceScope.Dispose
if (exceptions.Count > 0)
{
exceptions.Add(e);
var aggregateException = new AggregateException(exceptions);
if (!exceptions[0].RetryBatchAllowed())
{
aggregateException.DoNotRetryBatch();
}
throw aggregateException;
}
throw;
#pragma warning restore CA2219
}
}
}, stoppingToken);
}