public static IReadOnlyCollection CommitState()

in src/Epam.Kafka.PubSub/Subscription/State/ExternalStateExtensions.cs [12:62]


    public static IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue>(
        this IExternalOffsetsStorage offsetsStorage,
        SubscriptionTopicWrapper<TKey, TValue> topic,
        IReadOnlyCollection<TopicPartitionOffset> offsets,
        ActivityWrapper activitySpan,
        CancellationToken cancellationToken)
    {
        if (topic == null)
        {
            throw new ArgumentNullException(nameof(topic));
        }

        if (offsets == null)
        {
            throw new ArgumentNullException(nameof(offsets));
        }

        var pause = new List<TopicPartition>();
        var reset = new List<TopicPartitionOffset>();
        var committed = new List<TopicPartitionOffset>();
        IReadOnlyCollection<TopicPartitionOffset> newState;

        using (ActivityWrapper wrapper = activitySpan.CreateSpan("commit_external"))
        {
            newState = offsetsStorage.CommitOrReset(offsets, topic.ConsumerGroup, cancellationToken);
            wrapper.SetResult(newState);
        }

        foreach (TopicPartitionOffset item in newState)
        {
            TopicPartitionOffset expected = offsets.Single(x => x.TopicPartition == item.TopicPartition);

            // compare actual and expected value to understand if offset was committed or reset
            if (expected.Offset == item.Offset)
            {
                committed.Add(expected);
            }
            else
            {
                PauseOrReset(topic, item, pause, reset);
            }
        }

        topic.OnReset(reset);

        topic.OnPause(pause);

        topic.CommitOffsetIfNeeded(activitySpan, newState);

        return committed;
    }