private void ExecutePipeline()

in src/Epam.Kafka.PubSub/Common/PubSubBackgroundService.cs [134:194]


    private void ExecutePipeline(CancellationToken stoppingToken)
    {
        TTopic? topicWrapper = default;

        ISyncPolicy<TBatchResult> batchPolicy = this.GetBatchRetryPolicy(stoppingToken);

        while (!stoppingToken.IsCancellationRequested)
        {
            this.Monitor.Pipeline.Update(PipelineStatus.Running);

            try
            {
                topicWrapper ??= this.CreateTopicWrapper();

                TBatchResult batchResult = this.ExecuteBatchWrapper(batchPolicy, topicWrapper, stoppingToken);

                this.Monitor.HandleResult(batchResult);

                this.BatchFinishedTimeout(batchResult, stoppingToken);
            }
            catch (Exception exception)
            {
                this.Monitor.Batch.Update(BatchStatus.None);

                topicWrapper?.Dispose();
                topicWrapper = default;

                if (stoppingToken.IsCancellationRequested)
                {
                    this.Monitor.Pipeline.Update(PipelineStatus.Cancelled);

                    return;
                }

                this.Monitor.Result.Update((TBatchResult)(object)BatchResult.Error);

                if (this.Monitor.PipelineRetryIteration >= this.Options.PipelineRetryCount ||
                    !exception.RetryPipelineAllowed())
                {
                    this.Monitor.Pipeline.Update(PipelineStatus.Failed);

                    this.Logger.PipelineFailed(exception, this.Monitor.Name);

                    throw;
                }

                this.Monitor.PipelineRetryIteration++;

                this.Monitor.Pipeline.Update(PipelineStatus.RetryTimeout);

                this.Logger.PipelineRetry(exception, this.Monitor.Name, this.Monitor.PipelineRetryIteration,
                    this.Options.PipelineRetryCount, this.Options.PipelineRetryTimeout);

                Task.Delay(this.Options.PipelineRetryTimeout, stoppingToken).Wait(stoppingToken);
            }
        }

        topicWrapper?.Dispose();
        this.Monitor.Pipeline.Update(PipelineStatus.Cancelled);
        this.Monitor.Batch.Update(BatchStatus.None);
    }