in src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs [375:424]
private void OnPartitionsAssigned(IConsumer<TKey, TValue> c, List<TopicPartitionOffset> list)
{
if (list.Count > 0)
{
if (this.ExternalState != null)
{
var tp = list.Select(x => x.TopicPartition).ToList();
list.Clear();
#pragma warning disable CA1031 // can't throw exceptions in handler callback because it triggers incorrect state in librdkafka and some times leads to app crash.
try
{
IEnumerable<TopicPartitionOffset> state = this.ExternalState.Invoke(tp);
foreach (TopicPartitionOffset tpo in state)
{
this._offsets[tpo.TopicPartition] = tpo.Offset;
list.Add(tpo.Offset == ExternalOffset.Paused
? new TopicPartitionOffset(tpo.TopicPartition, Offset.End)
: tpo);
}
}
catch (Exception exception)
{
// Save it and throw later to trigger pipeline retry.
exception.DoNotRetryBatch();
this._exception = exception;
if (exception is not OperationCanceledException)
{
this.Logger.PartitionsAssignError(exception, this.Monitor.Name, c.MemberId, tp);
}
else
{
this.Logger.PartitionsAssignCancelled(this.Monitor.Name, c.MemberId, tp);
}
// set Offset.End special value to prevent reading from topic partitions until pipeline restart.
list.AddRange(tp.Select(x => new TopicPartitionOffset(x, Offset.End)));
return;
}
#pragma warning restore CA1031
}
this.Logger.PartitionsAssigned(this.Monitor.Name, c.MemberId, list);
}
}