in src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/DbContextOffsetsStorage.cs [92:162]
public IReadOnlyCollection<TopicPartitionOffset> CommitOrReset(
IReadOnlyCollection<TopicPartitionOffset> offsets,
string? consumerGroup,
CancellationToken cancellationToken)
{
if (offsets == null)
{
throw new ArgumentNullException(nameof(offsets));
}
consumerGroup ??= string.Empty;
DbSet<KafkaTopicState> dbSet = this._context.KafkaTopicStates;
ICollection<KafkaTopicState> locals = dbSet.Local;
foreach (TopicPartitionOffset item in offsets)
{
KafkaTopicState local = locals.Single(x =>
x.Topic == item.Topic && x.Partition == item.Partition && x.ConsumerGroup == consumerGroup);
if (local.Pause && item.Offset == ExternalOffset.Paused)
{
// don't update paused topics with standard value.
}
else
{
local.Offset = item.Offset;
}
}
try
{
this._context.SaveChanges(true);
}
catch (DbUpdateException exception)
{
cancellationToken.ThrowIfCancellationRequested();
#pragma warning disable IDE0008 // Use explicit type not possible due to #if directives
foreach (var entry in exception.Entries)
{
if (entry.Entity is KafkaTopicState)
{
var proposedValues = entry.CurrentValues;
var databaseValues = entry.GetDatabaseValues();
if (databaseValues != null)
{
foreach (var property in
#if EF6
proposedValues.PropertyNames
#else
proposedValues.Properties
#endif
)
{
proposedValues[property] = databaseValues[property];
}
// Refresh original values to bypass next concurrency check
entry.OriginalValues.SetValues(databaseValues);
}
}
}
#pragma warning restore IDE0008
this._context.SaveChanges(true);
}
return GetLocalState(locals, offsets.Select(x => x.TopicPartition).ToList(), consumerGroup, out _);
}