in src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/DbContextOffsetsStorage.cs [164:190]
private static List<TopicPartitionOffset> GetLocalState(
ICollection<KafkaTopicState> locals,
IReadOnlyCollection<TopicPartition> topics,
string? consumerGroup,
out List<TopicPartition> toRequest)
{
var result = new List<TopicPartitionOffset>(topics.Count);
toRequest = new List<TopicPartition>(topics.Count);
foreach (TopicPartition item in topics)
{
KafkaTopicState? local = locals.SingleOrDefault(x =>
x.Topic == item.Topic && x.Partition == item.Partition && x.ConsumerGroup == consumerGroup);
if (local != null)
{
result.Add(new TopicPartitionOffset(item, local.Pause ? ExternalOffset.Paused : local.Offset));
}
else
{
toRequest.Add(item);
}
}
return result;
}