in src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs [153:191]
private List<TopicPartitionOffset> AutoResetOffsets(IReadOnlyCollection<TopicPartitionOffset> offsets, out List<TopicPartitionOffset> toReset)
{
toReset = new List<TopicPartitionOffset>(offsets.Count);
List<TopicPartitionOffset> result = new(offsets.Count);
foreach (TopicPartitionOffset tpo in offsets)
{
if (tpo.Offset == Offset.Unset)
{
TopicPartition topicPartition = tpo.TopicPartition;
WatermarkOffsets q = this.Consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(5));
switch (this._autoOffsetReset)
{
case AutoOffsetReset.Earliest:
var low = new TopicPartitionOffset(topicPartition, q.Low);
toReset.Add(low);
result.Add(low);
break;
case AutoOffsetReset.Latest:
var high = new TopicPartitionOffset(topicPartition, q.High);
toReset.Add(high);
result.Add(high);
break;
case AutoOffsetReset.Error:
throw new KafkaException(ErrorCode.Local_NoOffset);
default: result.Add(tpo); break;
}
}
else
{
result.Add(tpo);
}
}
return result;
}