public void OnReset()

in src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs [240:272]


    public void OnReset(IReadOnlyCollection<TopicPartitionOffset> items)
    {
        if (items.Count > 0)
        {
            List<TopicPartitionOffset> reset = new(items.Count);
            List<TopicPartitionOffset> resume = new(items.Count);

            foreach (TopicPartitionOffset tpo in items)
            {
                if (this._paused.Remove(tpo.TopicPartition))
                {
                    resume.Add(tpo);
                }
                else
                {
                    reset.Add(tpo);
                }
            }

            if (reset.Count > 0)
            {
                this.Logger.OffsetsReset(this.Monitor.Name, reset);
                this.CleanupBuffer(x => reset.Any(v => v.TopicPartition == x.TopicPartition), "partition offset reset");
            }

            if (resume.Count > 0)
            {
                this.Consumer.Resume(resume.Select(x => x.TopicPartition));
                this.Logger.PartitionsResumed(this.Monitor.Name, resume);
                this.CleanupBuffer(x => resume.Any(v => v.TopicPartition == x.TopicPartition), "partition offset resume");
            }
        }
    }