in src/Epam.Kafka.PubSub/Subscription/State/ExternalStateExtensions.cs [82:130]
public static void CommitOffsetIfNeeded<TKey, TValue>(
this SubscriptionTopicWrapper<TKey, TValue> topic,
ActivityWrapper activitySpan,
IEnumerable<TopicPartitionOffset> offsets)
{
if (topic.Options.ExternalStateCommitToKafka)
{
List<TopicPartitionOffset> toCommit = new();
try
{
foreach (TopicPartitionOffset item in offsets)
{
if (item.Offset.Value >= 0)
{
toCommit.Add(item);
}
else if (item.Offset == Offset.Beginning)
{
WatermarkOffsets w = topic.Consumer.GetWatermarkOffsets(item.TopicPartition);
if (w.Low == Offset.Unset)
{
w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
}
if (w.Low.Value >= 0)
{
toCommit.Add(new TopicPartitionOffset(item.TopicPartition, w.Low));
}
}
else if (item.Offset == Offset.End)
{
WatermarkOffsets w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
toCommit.Add(new TopicPartitionOffset(item.TopicPartition, w.High));
}
}
topic.CommitOffsets(activitySpan, toCommit);
}
catch (KafkaException exception)
{
topic.Logger.KafkaCommitFailed(exception, topic.Monitor.Name, toCommit);
// ignore exception because external provider is a single point of truth for offsets.
// failed commit will trigger offset reset on next batch iteration
}
}
}