protected sealed override void Callback()

in src/Epam.Kafka.PubSub.EntityFrameworkCore/Publication/DbContextPublicationHandler.cs [84:119]


    protected sealed override void Callback(IReadOnlyDictionary<TEntity, IReadOnlyCollection<DeliveryReport>> reports,
        DateTimeOffset? transactionEnd, CancellationToken cancellationToken)
    {
        if (reports == null)
        {
            throw new ArgumentNullException(nameof(reports));
        }

        foreach (KeyValuePair<TEntity, IReadOnlyCollection<DeliveryReport>> report in reports)
        {
            this.Callback(report.Key, report.Value, transactionEnd);
        }

        try
        {
            this.Context.SaveChanges(true);
        }
        catch (DbUpdateConcurrencyException exception)
        {
            if (this.OnConcurrencyException == KafkaPublicationConcurrency.Throw
                || (this.OnConcurrencyException == KafkaPublicationConcurrency.ThrowIfTransaction &&
                    transactionEnd.HasValue))
            {
                throw;
            }

            foreach (TEntry? entry in exception.Entries)
            {
                entry.State = EntityState.Detached;

                this.Logger.PublicationEntityDetached(exception, "Report", this.FindPrimaryKeyForLogs(entry), typeof(TEntity));
            }

            this.Context.SaveChanges(true);
        }
    }