protected override void AssignConsumer()

in src/Epam.Kafka.PubSub/Subscription/State/ExternalState.cs [21:74]


    protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TKey, TValue> topic,
        ActivityWrapper activitySpan,
        CancellationToken cancellationToken)
    {
        if (topic == null)
            throw new ArgumentNullException(nameof(topic));

        IReadOnlyCollection<TopicPartition> topicPartitions = topic.Options.GetTopicPartitions();

        IReadOnlyCollection<TopicPartitionOffset> state =
            topic.GetAndResetState(this._offsetsStorage, topicPartitions, cancellationToken);

        var pause = new List<TopicPartition>();
        var reset = new List<TopicPartitionOffset>();
        var assign = new List<TopicPartitionOffset>();
        var assignNonPaused = new List<TopicPartitionOffset>();

        foreach (TopicPartitionOffset item in state)
        {
            // existing assignment, check if offset reset
            if (topic.Consumer.Assignment.Contains(item.TopicPartition))
            {
                if (topic.TryGetOffset(item.TopicPartition, out Offset tp) && tp != item.Offset)
                {
                    ExternalStateExtensions.PauseOrReset(topic, item, pause, reset);
                }
            }
            else
            {
                TopicPartitionOffset tpo = new(item.TopicPartition, item.Offset);

                // first assign offset.end, than pause consumer
                if (tpo.Offset == ExternalOffset.Paused)
                {
                    pause.Add(item.TopicPartition);
                    tpo = new(item.TopicPartition, Offset.End);
                }
                else
                {
                    assignNonPaused.Add(tpo);
                }

                assign.Add(tpo);
            }
        }

        topic.OnAssign(assign);

        topic.OnReset(reset);

        topic.OnPause(pause);

        topic.CommitOffsetIfNeeded(activitySpan, reset.Concat(assignNonPaused));
    }