in src/Epam.Kafka.PubSub.EntityFrameworkCore/Publication/Contracts/DbContextEntityPublicationHandler.cs [68:94]
protected override void TransactionCommitted(IReadOnlyCollection<TEntity> entities,
CancellationToken cancellationToken)
{
if (entities == null)
{
throw new ArgumentNullException(nameof(entities));
}
foreach (TEntity entity in entities.Where(x => x.KafkaPubState == KafkaPublicationState.Delivered))
{
entity.KafkaPubState = KafkaPublicationState.Committed;
}
try
{
this.Context.SaveChanges(true);
}
catch (DbUpdateConcurrencyException exception)
{
#pragma warning disable IDE0008 // Use explicit type not possible due to #if directives
foreach (var entry in exception.Entries)
{
this.Logger.PublicationEntityDetached(exception, "Commit", this.FindPrimaryKeyForLogs(entry), typeof(TEntity));
}
#pragma warning restore IDE0008
}
}