in src/Epam.Kafka.PubSub/Common/HealthChecks/PubSubHealthCheck.cs [21:67]
public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = new())
{
PipelineMonitor monitor = this.GetPipelineMonitor();
HealthStatus result;
TimeSpan pipelineTimespan = this.UtcNow - monitor.Pipeline.TimestampUtc;
switch (monitor.Pipeline.Value)
{
case PipelineStatus.None:
result = pipelineTimespan > this.Options.HealthChecksThresholdPipeline
? HealthStatus.Unhealthy
: HealthStatus.Degraded;
break;
case PipelineStatus.Failed:
case PipelineStatus.RetryTimeout:
result = HealthStatus.Unhealthy;
break;
case PipelineStatus.Cancelled:
result = HealthStatus.Degraded;
break;
case PipelineStatus.Disabled:
result = HealthStatus.Healthy;
break;
case PipelineStatus.Running:
{
result = monitor.PipelineRetryIteration > 0 ? HealthStatus.Unhealthy : this.GetBatchStatus();
break;
}
default:
throw new InvalidOperationException(
$"Unknown pipeline status {monitor.Pipeline.Value}. Supported values: " +
string.Join(", ",
#if NET6_0_OR_GREATER
Enum.GetNames<PipelineStatus>()
#else
Enum.GetNames(typeof(PipelineStatus))
#endif
));
}
return Task.FromResult(new HealthCheckResult(
result == HealthStatus.Unhealthy ? context.Registration?.FailureStatus ?? HealthStatus.Unhealthy : result,
monitor.ToString()));
}