private bool OnPauseEnumerate()

in src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs [279:315]


    private bool OnPauseEnumerate(IEnumerable<TopicPartition> items)
    {
        List<TopicPartition> result = new();

        foreach (TopicPartition tp in items)
        {
            if (this.Consumer.Assignment.Contains(tp) && !this._paused.Contains(tp))
            {
                result.Add(tp);
            }
        }

        if (result.Count > 0)
        {
            try
            {
                this.Consumer.Pause(result);
            }
            catch (Exception e)
            {
                e.DoNotRetryBatch();
                throw;
            }

            foreach (TopicPartition r in result)
            {
                this._paused.Add(r);
                this._offsets[r] = ExternalOffset.Paused;
            }

            this.Logger.PartitionsPaused(this.Monitor.Name, result);

            return this.CleanupBuffer(x => result.Any(v => v == x.TopicPartition), "partition paused");
        }

        return false;
    }