in src/Epam.Kafka.PubSub/Common/PubSubBackgroundService.cs [134:194]
private void ExecutePipeline(CancellationToken stoppingToken)
{
TTopic? topicWrapper = default;
ISyncPolicy<TBatchResult> batchPolicy = this.GetBatchRetryPolicy(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
this.Monitor.Pipeline.Update(PipelineStatus.Running);
try
{
topicWrapper ??= this.CreateTopicWrapper();
TBatchResult batchResult = this.ExecuteBatchWrapper(batchPolicy, topicWrapper, stoppingToken);
this.Monitor.HandleResult(batchResult);
this.BatchFinishedTimeout(batchResult, stoppingToken);
}
catch (Exception exception)
{
this.Monitor.Batch.Update(BatchStatus.None);
topicWrapper?.Dispose();
topicWrapper = default;
if (stoppingToken.IsCancellationRequested)
{
this.Monitor.Pipeline.Update(PipelineStatus.Cancelled);
return;
}
this.Monitor.Result.Update((TBatchResult)(object)BatchResult.Error);
if (this.Monitor.PipelineRetryIteration >= this.Options.PipelineRetryCount ||
!exception.RetryPipelineAllowed())
{
this.Monitor.Pipeline.Update(PipelineStatus.Failed);
this.Logger.PipelineFailed(exception, this.Monitor.Name);
throw;
}
this.Monitor.PipelineRetryIteration++;
this.Monitor.Pipeline.Update(PipelineStatus.RetryTimeout);
this.Logger.PipelineRetry(exception, this.Monitor.Name, this.Monitor.PipelineRetryIteration,
this.Options.PipelineRetryCount, this.Options.PipelineRetryTimeout);
Task.Delay(this.Options.PipelineRetryTimeout, stoppingToken).Wait(stoppingToken);
}
}
topicWrapper?.Dispose();
this.Monitor.Pipeline.Update(PipelineStatus.Cancelled);
this.Monitor.Batch.Update(BatchStatus.None);
}