in src/Epam.Kafka.PubSub.EntityFrameworkCore/Publication/DbContextPublicationHandler.cs [84:119]
protected sealed override void Callback(IReadOnlyDictionary<TEntity, IReadOnlyCollection<DeliveryReport>> reports,
DateTimeOffset? transactionEnd, CancellationToken cancellationToken)
{
if (reports == null)
{
throw new ArgumentNullException(nameof(reports));
}
foreach (KeyValuePair<TEntity, IReadOnlyCollection<DeliveryReport>> report in reports)
{
this.Callback(report.Key, report.Value, transactionEnd);
}
try
{
this.Context.SaveChanges(true);
}
catch (DbUpdateConcurrencyException exception)
{
if (this.OnConcurrencyException == KafkaPublicationConcurrency.Throw
|| (this.OnConcurrencyException == KafkaPublicationConcurrency.ThrowIfTransaction &&
transactionEnd.HasValue))
{
throw;
}
foreach (TEntry? entry in exception.Entries)
{
entry.State = EntityState.Detached;
this.Logger.PublicationEntityDetached(exception, "Report", this.FindPrimaryKeyForLogs(entry), typeof(TEntity));
}
this.Context.SaveChanges(true);
}
}