in src/Epam.Kafka.PubSub/Subscription/State/CombinedState.cs [20:55]
protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TKey, TValue> topic,
ActivityWrapper activitySpan,
CancellationToken cancellationToken)
{
topic.ExternalState = list => topic.GetAndResetState(this._offsetsStorage, list, cancellationToken);
base.AssignConsumer(topic, activitySpan, cancellationToken);
if (topic.Consumer.Assignment.Count > 0)
{
var reset = new List<TopicPartitionOffset>();
var pause = new List<TopicPartition>();
IReadOnlyCollection<TopicPartitionOffset> state = this._offsetsStorage.GetOrCreate(
topic.Consumer.Assignment, topic.ConsumerGroup,
cancellationToken);
foreach (TopicPartitionOffset item in state)
{
if (topic.TryGetOffset(item.TopicPartition, out Offset previous))
{
// don't reset paused offset
if (previous != item.Offset)
{
ExternalStateExtensions.PauseOrReset(topic, item, pause, reset);
}
}
}
topic.OnReset(reset);
topic.OnPause(pause);
topic.CommitOffsetIfNeeded(activitySpan, reset);
}
}