src/Epam.Kafka.PubSub/Publication/HealthChecks/PublicationHealthCheck.cs (61 lines of code) (raw):

// Copyright © 2024 EPAM Systems using Epam.Kafka.PubSub.Common; using Epam.Kafka.PubSub.Common.HealthChecks; using Epam.Kafka.PubSub.Common.Pipeline; using Epam.Kafka.PubSub.Publication.Options; using Epam.Kafka.PubSub.Publication.Pipeline; using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Options; namespace Epam.Kafka.PubSub.Publication.HealthChecks; internal class PublicationHealthCheck : PubSubHealthCheck { private readonly PublicationMonitor _monitor; public PublicationHealthCheck(IOptionsMonitor<PublicationOptions> optionsMonitor, PubSubContext context, string name) : this( (optionsMonitor ?? throw new ArgumentNullException(nameof(optionsMonitor))).Get( name ?? throw new ArgumentNullException(nameof(name))), (context ?? throw new ArgumentNullException(nameof(context))).Publications.TryGetValue(name, out PublicationMonitor? monitor) ? monitor : throw new ArgumentOutOfRangeException(nameof(name), $"Publication with key {name} not presented in the context")) { } public PublicationHealthCheck(PublicationOptions options, PublicationMonitor monitor) : base(options) { this._monitor = monitor ?? throw new ArgumentNullException(nameof(monitor)); } protected override PipelineMonitor GetPipelineMonitor() { return this._monitor; } protected override HealthStatus GetBatchStatus() { TimeSpan handlerTimespan = this.UtcNow - this._monitor.Batch.TimestampUtc; if (this._monitor.Batch.Value is BatchStatus.None or BatchStatus.Finished) { if (handlerTimespan > this.Options.HealthChecksThresholdBatch) { return HealthStatus.Unhealthy; } } if (this._monitor.Batch.Value is BatchStatus.Running or BatchStatus.Reading or BatchStatus.Commiting) { if (handlerTimespan > this.Options.HandlerTimeout) { return HealthStatus.Unhealthy; } } if (this._monitor.Batch.Value == BatchStatus.Queued) { if (this.QueuedHealthStatus(handlerTimespan, out HealthStatus healthStatus)) return healthStatus; } if (this._monitor.Result.Value is PublicationBatchResult.Error or PublicationBatchResult.ProcessedPartial) { return HealthStatus.Degraded; } return HealthStatus.Healthy; } }