public IReadOnlyCollection GetOrCreate()

in src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/DbContextOffsetsStorage.cs [34:90]


    public IReadOnlyCollection<TopicPartitionOffset> GetOrCreate(IReadOnlyCollection<TopicPartition> topics,
        string? consumerGroup, CancellationToken cancellationToken)
    {
        if (topics == null)
        {
            throw new ArgumentNullException(nameof(topics));
        }

        consumerGroup ??= string.Empty;

        cancellationToken.ThrowIfCancellationRequested();
        DbSet<KafkaTopicState> dbSet = this._context.KafkaTopicStates;
        ICollection<KafkaTopicState> locals = dbSet.Local;

        List<TopicPartitionOffset> result =
            GetLocalState(locals, topics, consumerGroup, out List<TopicPartition> toRequest);

        if (toRequest.Count > 0)
        {
            foreach (IGrouping<string, TopicPartition> g in toRequest.GroupBy(x => x.Topic))
            {
                int[] partitions = g.Select(tp => tp.Partition.Value).ToArray();

                cancellationToken.ThrowIfCancellationRequested();

                dbSet.AsTracking()
                    .Where(x => x.ConsumerGroup == consumerGroup && x.Topic == g.Key && partitions.Contains(x.Partition))
                    .Load();
            }
        }

        foreach (TopicPartition item in toRequest)
        {
            KafkaTopicState? local = locals.SingleOrDefault(x =>
                x.Topic == item.Topic && x.Partition == item.Partition && x.ConsumerGroup == consumerGroup);

            if (local != null)
            {
                result.Add(new TopicPartitionOffset(item, local.Pause ? ExternalOffset.Paused : local.Offset));
            }
            else
            {
                // don't have information in database yet;
                result.Add(new TopicPartitionOffset(item, Offset.Unset));
                dbSet.Add(new KafkaTopicState
                {
                    Topic = item.Topic,
                    Partition = item.Partition,
                    ConsumerGroup = consumerGroup,
                    Offset = Offset.Unset,
                    Timestamp = DateTimeOffset.UtcNow
                });
            }
        }

        return result;
    }