private List AutoResetOffsets()

in src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs [153:191]


    private List<TopicPartitionOffset> AutoResetOffsets(IReadOnlyCollection<TopicPartitionOffset> offsets, out List<TopicPartitionOffset> toReset)
    {
        toReset = new List<TopicPartitionOffset>(offsets.Count);
        List<TopicPartitionOffset> result = new(offsets.Count);

        foreach (TopicPartitionOffset tpo in offsets)
        {
            if (tpo.Offset == Offset.Unset)
            {
                TopicPartition topicPartition = tpo.TopicPartition;

                WatermarkOffsets q = this.Consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(5));

                switch (this._autoOffsetReset)
                {
                    case AutoOffsetReset.Earliest:
                        var low = new TopicPartitionOffset(topicPartition, q.Low);
                        toReset.Add(low);
                        result.Add(low);
                        break;
                    case AutoOffsetReset.Latest:
                        var high = new TopicPartitionOffset(topicPartition, q.High);
                        toReset.Add(high);
                        result.Add(high);
                        break;
                    case AutoOffsetReset.Error:
                        throw new KafkaException(ErrorCode.Local_NoOffset);

                    default: result.Add(tpo); break;
                }
            }
            else
            {
                result.Add(tpo);
            }
        }

        return result;
    }