private TBatchResult ExecuteBatchWrapper()

in src/Epam.Kafka.PubSub/Common/PubSubBackgroundService.cs [219:279]


    private TBatchResult ExecuteBatchWrapper(
        ISyncPolicy<TBatchResult> batchPolicy,
        TTopic topicWrapper,
        CancellationToken stoppingToken)
    {
        return batchPolicy.Execute(ct =>
        {
            using ActivityWrapper transaction = new(this._diagnosticListener, this.Monitor.FullName);

            IServiceScope serviceScope = this._serviceScopeFactory.CreateScope();

            var exceptions = new List<Exception>(2);

            try
            {
                IServiceProvider sp = serviceScope.ServiceProvider;

                this.ExecuteBatch(topicWrapper, sp, transaction, ct);

                this.AdaptiveBatchSize = null;

                transaction.SetResult(this.Monitor.Result.Value);

                return this.Monitor.Result.Value;
            }
            catch (Exception exception)
            {
                transaction.SetResult(exception);

                exceptions.Add(exception);

                throw;
            }
            finally
            {
                try
                {
                    serviceScope.Dispose();
                }
                catch (Exception e)
                {
#pragma warning disable CA2219 // Required to not lost information about neither batch exception nor exception from serviceScope.Dispose
                    if (exceptions.Count > 0)
                    {
                        exceptions.Add(e);
                        var aggregateException = new AggregateException(exceptions);

                        if (!exceptions[0].RetryBatchAllowed())
                        {
                            aggregateException.DoNotRetryBatch();
                        }

                        throw aggregateException;
                    }

                    throw;
#pragma warning restore CA2219
                }
            }
        }, stoppingToken);
    }