in src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs [477:549]
private void ReadToBuffer(ActivityWrapper span, CancellationToken cancellationToken)
{
this.UnassignedBeforeRead = this.Consumer.Assignment.Count == 0;
this._newPartitions.Clear();
int batchSize = this.Options.BatchSize;
try
{
while (!cancellationToken.IsCancellationRequested && this._buffer.Count < batchSize)
{
ConsumeResult<TKey, TValue>? consumeResult = this.Consumer.Consume(this._consumeTimeoutMs);
if (consumeResult == null || consumeResult.IsPartitionEOF)
{
break;
}
this._buffer.Add(consumeResult);
}
}
catch (ConsumeException consumeException)
{
ConsumeResult<byte[], byte[]> record = consumeException.ConsumerRecord;
#pragma warning disable IDE0010 // Add missing cases
switch (consumeException.Error.Code)
{
case ErrorCode.Local_Fatal:
{
consumeException.DoNotRetryPipeline();
throw;
}
case ErrorCode.Local_KeyDeserialization:
case ErrorCode.Local_ValueDeserialization:
{
// start next batch from message skipped for current batch.
this.Consumer.Seek(record.TopicPartitionOffset);
// will be thrown later if buffer is empty
break;
}
default:
{
consumeException.DoNotRetryBatch();
throw;
}
}
#pragma warning restore IDE0010 // Add missing cases
// unable to return at least something, so only throw
if (this._buffer.Count == 0)
{
this.HandleNewPartitions(span);
throw;
}
this.Logger.ConsumeError(consumeException, this.Monitor.Name,
consumeException.Error, record.TopicPartitionOffset);
}
catch (Exception exception)
{
// unexpected exception type most likely due to exception in partition assigned handler.
// need to recreate consumer, so don't retry batch
exception.DoNotRetryBatch();
throw;
}
this.HandleNewPartitions(span);
}