protected override void ExecuteBatch()

in src/Epam.Kafka.PubSub/Publication/PublicationBackgroundService.cs [175:212]


    protected override void ExecuteBatch(
        IPublicationTopicWrapper<TKey, TValue> topic,
        IServiceProvider sp,
        ActivityWrapper activitySpan,
        CancellationToken cancellationToken)
    {
        IPublicationHandler<TKey, TValue> handler =
            sp.ResolveRequiredService<IPublicationHandler<TKey, TValue>>(this.Options.HandlerType!);

        ISyncPolicy handlerPolicy = this.Monitor.Context.GetHandlerPolicy(this.Options);

        if (this.Options.HandlerConcurrencyGroup.HasValue)
        {
            this.Monitor.Batch.Update(BatchStatus.Queued);
        }

        try
        {
            handlerPolicy.Execute(ct => this.ExecuteBatchInternal(handler, topic, activitySpan, ct),
                cancellationToken);
        }
        catch (Exception e1)
        {
            try
            {
                topic.AbortTransactionIfNeeded(activitySpan);
            }
            catch (Exception e2)
            {
                var exception = new AggregateException(e1, e2);
                exception.DoNotRetryBatch();

                throw exception;
            }

            throw;
        }
    }