public IReadOnlyCollection CommitOrReset()

in src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/DbContextOffsetsStorage.cs [92:162]


    public IReadOnlyCollection<TopicPartitionOffset> CommitOrReset(
        IReadOnlyCollection<TopicPartitionOffset> offsets,
        string? consumerGroup,
        CancellationToken cancellationToken)
    {
        if (offsets == null)
        {
            throw new ArgumentNullException(nameof(offsets));
        }

        consumerGroup ??= string.Empty;

        DbSet<KafkaTopicState> dbSet = this._context.KafkaTopicStates;
        ICollection<KafkaTopicState> locals = dbSet.Local;

        foreach (TopicPartitionOffset item in offsets)
        {
            KafkaTopicState local = locals.Single(x =>
                x.Topic == item.Topic && x.Partition == item.Partition && x.ConsumerGroup == consumerGroup);

            if (local.Pause && item.Offset == ExternalOffset.Paused)
            {
                // don't update paused topics with standard value.
            }
            else
            {
                local.Offset = item.Offset;
            }
        }

        try
        {
            this._context.SaveChanges(true);
        }
        catch (DbUpdateException exception)
        {
            cancellationToken.ThrowIfCancellationRequested();

#pragma warning disable IDE0008 // Use explicit type not possible due to #if directives
            foreach (var entry in exception.Entries)
            {
                if (entry.Entity is KafkaTopicState)
                {
                    var proposedValues = entry.CurrentValues;
                    var databaseValues = entry.GetDatabaseValues();

                    if (databaseValues != null)
                    {
                        foreach (var property in
#if EF6
                                 proposedValues.PropertyNames
#else
                                 proposedValues.Properties
#endif
                                )
                        {
                            proposedValues[property] = databaseValues[property];
                        }

                        // Refresh original values to bypass next concurrency check
                        entry.OriginalValues.SetValues(databaseValues);
                    }
                }
            }
#pragma warning restore IDE0008

            this._context.SaveChanges(true);
        }

        return GetLocalState(locals, offsets.Select(x => x.TopicPartition).ToList(), consumerGroup, out _);
    }