in src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/DbContextOffsetsStorage.cs [34:90]
public IReadOnlyCollection<TopicPartitionOffset> GetOrCreate(IReadOnlyCollection<TopicPartition> topics,
string? consumerGroup, CancellationToken cancellationToken)
{
if (topics == null)
{
throw new ArgumentNullException(nameof(topics));
}
consumerGroup ??= string.Empty;
cancellationToken.ThrowIfCancellationRequested();
DbSet<KafkaTopicState> dbSet = this._context.KafkaTopicStates;
ICollection<KafkaTopicState> locals = dbSet.Local;
List<TopicPartitionOffset> result =
GetLocalState(locals, topics, consumerGroup, out List<TopicPartition> toRequest);
if (toRequest.Count > 0)
{
foreach (IGrouping<string, TopicPartition> g in toRequest.GroupBy(x => x.Topic))
{
int[] partitions = g.Select(tp => tp.Partition.Value).ToArray();
cancellationToken.ThrowIfCancellationRequested();
dbSet.AsTracking()
.Where(x => x.ConsumerGroup == consumerGroup && x.Topic == g.Key && partitions.Contains(x.Partition))
.Load();
}
}
foreach (TopicPartition item in toRequest)
{
KafkaTopicState? local = locals.SingleOrDefault(x =>
x.Topic == item.Topic && x.Partition == item.Partition && x.ConsumerGroup == consumerGroup);
if (local != null)
{
result.Add(new TopicPartitionOffset(item, local.Pause ? ExternalOffset.Paused : local.Offset));
}
else
{
// don't have information in database yet;
result.Add(new TopicPartitionOffset(item, Offset.Unset));
dbSet.Add(new KafkaTopicState
{
Topic = item.Topic,
Partition = item.Partition,
ConsumerGroup = consumerGroup,
Offset = Offset.Unset,
Timestamp = DateTimeOffset.UtcNow
});
}
}
return result;
}