protected override void AssignConsumer()

in src/Epam.Kafka.PubSub/Subscription/State/CombinedState.cs [20:55]


    protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TKey, TValue> topic,
        ActivityWrapper activitySpan,
        CancellationToken cancellationToken)
    {
        topic.ExternalState = list => topic.GetAndResetState(this._offsetsStorage, list, cancellationToken);

        base.AssignConsumer(topic, activitySpan, cancellationToken);

        if (topic.Consumer.Assignment.Count > 0)
        {
            var reset = new List<TopicPartitionOffset>();
            var pause = new List<TopicPartition>();

            IReadOnlyCollection<TopicPartitionOffset> state = this._offsetsStorage.GetOrCreate(
                topic.Consumer.Assignment, topic.ConsumerGroup,
                cancellationToken);

            foreach (TopicPartitionOffset item in state)
            {
                if (topic.TryGetOffset(item.TopicPartition, out Offset previous))
                {
                    // don't reset paused offset
                    if (previous != item.Offset)
                    {
                        ExternalStateExtensions.PauseOrReset(topic, item, pause, reset);
                    }
                }
            }

            topic.OnReset(reset);

            topic.OnPause(pause);

            topic.CommitOffsetIfNeeded(activitySpan, reset);
        }
    }