in src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs [279:315]
private bool OnPauseEnumerate(IEnumerable<TopicPartition> items)
{
List<TopicPartition> result = new();
foreach (TopicPartition tp in items)
{
if (this.Consumer.Assignment.Contains(tp) && !this._paused.Contains(tp))
{
result.Add(tp);
}
}
if (result.Count > 0)
{
try
{
this.Consumer.Pause(result);
}
catch (Exception e)
{
e.DoNotRetryBatch();
throw;
}
foreach (TopicPartition r in result)
{
this._paused.Add(r);
this._offsets[r] = ExternalOffset.Paused;
}
this.Logger.PartitionsPaused(this.Monitor.Name, result);
return this.CleanupBuffer(x => result.Any(v => v == x.TopicPartition), "partition paused");
}
return false;
}