private void ReadToBuffer()

in src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs [477:549]


    private void ReadToBuffer(ActivityWrapper span, CancellationToken cancellationToken)
    {
        this.UnassignedBeforeRead = this.Consumer.Assignment.Count == 0;

        this._newPartitions.Clear();

        int batchSize = this.Options.BatchSize;

        try
        {
            while (!cancellationToken.IsCancellationRequested && this._buffer.Count < batchSize)
            {
                ConsumeResult<TKey, TValue>? consumeResult = this.Consumer.Consume(this._consumeTimeoutMs);

                if (consumeResult == null || consumeResult.IsPartitionEOF)
                {
                    break;
                }

                this._buffer.Add(consumeResult);
            }
        }
        catch (ConsumeException consumeException)
        {
            ConsumeResult<byte[], byte[]> record = consumeException.ConsumerRecord;

#pragma warning disable IDE0010 // Add missing cases
            switch (consumeException.Error.Code)
            {
                case ErrorCode.Local_Fatal:
                    {
                        consumeException.DoNotRetryPipeline();
                        throw;
                    }
                case ErrorCode.Local_KeyDeserialization:
                case ErrorCode.Local_ValueDeserialization:
                    {
                        // start next batch from message skipped for current batch.
                        this.Consumer.Seek(record.TopicPartitionOffset);
                        // will be thrown later if buffer is empty
                        break;
                    }

                default:
                    {
                        consumeException.DoNotRetryBatch();
                        throw;
                    }
            }
#pragma warning restore IDE0010 // Add missing cases

            // unable to return at least something, so only throw
            if (this._buffer.Count == 0)
            {
                this.HandleNewPartitions(span);

                throw;
            }

            this.Logger.ConsumeError(consumeException, this.Monitor.Name,
                consumeException.Error, record.TopicPartitionOffset);
        }
        catch (Exception exception)
        {
            // unexpected exception type most likely due to exception in partition assigned handler.
            // need to recreate consumer, so don't retry batch
            exception.DoNotRetryBatch();

            throw;
        }

        this.HandleNewPartitions(span);
    }