in src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs [240:272]
public void OnReset(IReadOnlyCollection<TopicPartitionOffset> items)
{
if (items.Count > 0)
{
List<TopicPartitionOffset> reset = new(items.Count);
List<TopicPartitionOffset> resume = new(items.Count);
foreach (TopicPartitionOffset tpo in items)
{
if (this._paused.Remove(tpo.TopicPartition))
{
resume.Add(tpo);
}
else
{
reset.Add(tpo);
}
}
if (reset.Count > 0)
{
this.Logger.OffsetsReset(this.Monitor.Name, reset);
this.CleanupBuffer(x => reset.Any(v => v.TopicPartition == x.TopicPartition), "partition offset reset");
}
if (resume.Count > 0)
{
this.Consumer.Resume(resume.Select(x => x.TopicPartition));
this.Logger.PartitionsResumed(this.Monitor.Name, resume);
this.CleanupBuffer(x => resume.Any(v => v.TopicPartition == x.TopicPartition), "partition offset resume");
}
}
}