in src/Epam.Kafka.PubSub/Subscription/State/ExternalStateExtensions.cs [12:62]
public static IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue>(
this IExternalOffsetsStorage offsetsStorage,
SubscriptionTopicWrapper<TKey, TValue> topic,
IReadOnlyCollection<TopicPartitionOffset> offsets,
ActivityWrapper activitySpan,
CancellationToken cancellationToken)
{
if (topic == null)
{
throw new ArgumentNullException(nameof(topic));
}
if (offsets == null)
{
throw new ArgumentNullException(nameof(offsets));
}
var pause = new List<TopicPartition>();
var reset = new List<TopicPartitionOffset>();
var committed = new List<TopicPartitionOffset>();
IReadOnlyCollection<TopicPartitionOffset> newState;
using (ActivityWrapper wrapper = activitySpan.CreateSpan("commit_external"))
{
newState = offsetsStorage.CommitOrReset(offsets, topic.ConsumerGroup, cancellationToken);
wrapper.SetResult(newState);
}
foreach (TopicPartitionOffset item in newState)
{
TopicPartitionOffset expected = offsets.Single(x => x.TopicPartition == item.TopicPartition);
// compare actual and expected value to understand if offset was committed or reset
if (expected.Offset == item.Offset)
{
committed.Add(expected);
}
else
{
PauseOrReset(topic, item, pause, reset);
}
}
topic.OnReset(reset);
topic.OnPause(pause);
topic.CommitOffsetIfNeeded(activitySpan, newState);
return committed;
}