private void OnPartitionsAssigned()

in src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs [375:424]


    private void OnPartitionsAssigned(IConsumer<TKey, TValue> c, List<TopicPartitionOffset> list)
    {
        if (list.Count > 0)
        {
            if (this.ExternalState != null)
            {
                var tp = list.Select(x => x.TopicPartition).ToList();

                list.Clear();

#pragma warning disable CA1031 // can't throw exceptions in handler callback because it triggers incorrect state in librdkafka and some times leads to app crash. 
                try
                {
                    IEnumerable<TopicPartitionOffset> state = this.ExternalState.Invoke(tp);

                    foreach (TopicPartitionOffset tpo in state)
                    {
                        this._offsets[tpo.TopicPartition] = tpo.Offset;

                        list.Add(tpo.Offset == ExternalOffset.Paused
                            ? new TopicPartitionOffset(tpo.TopicPartition, Offset.End)
                            : tpo);
                    }
                }
                catch (Exception exception)
                {
                    // Save it and throw later to trigger pipeline retry.
                    exception.DoNotRetryBatch();
                    this._exception = exception;

                    if (exception is not OperationCanceledException)
                    {
                        this.Logger.PartitionsAssignError(exception, this.Monitor.Name, c.MemberId, tp);
                    }
                    else
                    {
                        this.Logger.PartitionsAssignCancelled(this.Monitor.Name, c.MemberId, tp);
                    }

                    // set Offset.End special value to prevent reading from topic partitions until pipeline restart.
                    list.AddRange(tp.Select(x => new TopicPartitionOffset(x, Offset.End)));

                    return;
                }
#pragma warning restore CA1031
            }

            this.Logger.PartitionsAssigned(this.Monitor.Name, c.MemberId, list);
        }
    }