in src/Epam.Kafka.PubSub/Utils/OffsetsExtensions.cs [9:39]
public static void GetOffsetsRange<TKey, TValue>(this IEnumerable<ConsumeResult<TKey, TValue>> results,
out IDictionary<TopicPartition, Offset> from, out IDictionary<TopicPartition, Offset> to)
{
if (results == null) throw new ArgumentNullException(nameof(results));
from = new Dictionary<TopicPartition, Offset>();
to = new Dictionary<TopicPartition, Offset>();
foreach (ConsumeResult<TKey, TValue> item in results)
{
if (!to.TryGetValue(item.TopicPartition, out Offset currentTo))
{
to.Add(item.TopicPartition, item.Offset);
}
if (!from.TryGetValue(item.TopicPartition, out Offset currentFrom))
{
from.Add(item.TopicPartition, item.Offset);
}
if (item.Offset.Value > currentTo)
{
to[item.TopicPartition] = item.Offset;
}
if (item.Offset.Value < currentFrom)
{
from[item.TopicPartition] = item.Offset;
}
}
}