public static void CommitOffsetIfNeeded()

in src/Epam.Kafka.PubSub/Subscription/State/ExternalStateExtensions.cs [82:130]


    public static void CommitOffsetIfNeeded<TKey, TValue>(
        this SubscriptionTopicWrapper<TKey, TValue> topic,
        ActivityWrapper activitySpan,
        IEnumerable<TopicPartitionOffset> offsets)
    {
        if (topic.Options.ExternalStateCommitToKafka)
        {
            List<TopicPartitionOffset> toCommit = new();

            try
            {
                foreach (TopicPartitionOffset item in offsets)
                {
                    if (item.Offset.Value >= 0)
                    {
                        toCommit.Add(item);
                    }
                    else if (item.Offset == Offset.Beginning)
                    {
                        WatermarkOffsets w = topic.Consumer.GetWatermarkOffsets(item.TopicPartition);

                        if (w.Low == Offset.Unset)
                        {
                            w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
                        }

                        if (w.Low.Value >= 0)
                        {
                            toCommit.Add(new TopicPartitionOffset(item.TopicPartition, w.Low));
                        }
                    }
                    else if (item.Offset == Offset.End)
                    {
                        WatermarkOffsets w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
                        toCommit.Add(new TopicPartitionOffset(item.TopicPartition, w.High));
                    }
                }

                topic.CommitOffsets(activitySpan, toCommit);
            }
            catch (KafkaException exception)
            {
                topic.Logger.KafkaCommitFailed(exception, topic.Monitor.Name, toCommit);

                // ignore exception because external provider is a single point of truth for offsets.
                // failed commit will trigger offset reset on next batch iteration
            }
        }
    }