in src/Epam.Kafka.PubSub/Subscription/SubscriptionBackgroundService.cs [99:168]
protected override void ExecuteBatch(
SubscriptionTopicWrapper<TKey, TValue> topic,
IServiceProvider sp,
ActivityWrapper activitySpan,
CancellationToken cancellationToken)
{
BatchState state = sp.ResolveRequiredService<BatchState>(this.Options.StateType);
this.Monitor.Batch.Update(BatchStatus.Reading);
bool unassignedBeforeRead = state.GetBatch(
topic, activitySpan, out IReadOnlyCollection<ConsumeResult<TKey, TValue>> batch, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
if (this.AdaptiveBatchSize.HasValue)
{
batch = batch.Take(this.AdaptiveBatchSize.Value).ToList();
}
else
{
this.AdaptiveBatchSize = batch.Count;
}
if (batch.Count > 0)
{
batch.GetOffsetsRange(
out IDictionary<TopicPartition, Offset>? from,
out IDictionary<TopicPartition, Offset>? to);
this.Logger.SubBatchBegin(this.Monitor.Name, batch.Count,
from.Select(x => new TopicPartitionOffset(x.Key, x.Value)),
to.Select(x => new TopicPartitionOffset(x.Key, x.Value)));
this.CreateAndExecuteHandler(sp, topic, batch, activitySpan, cancellationToken);
this.Monitor.Batch.Update(BatchStatus.Commiting);
state.CommitResults(topic, activitySpan,
// offset of processed message + 1
to.PrepareOffsetsToCommit(), cancellationToken);
this.Monitor.Result.Update(SubscriptionBatchResult.Processed);
}
else
{
// get new instance of assignment list
List<TopicPartition> assignments = topic.Consumer.Assignment;
if (assignments.Count > 0 &&
assignments.All(x => topic.TryGetOffset(x, out Offset offset) && offset == ExternalOffset.Paused))
{
this.Monitor.Result.Update(SubscriptionBatchResult.Paused);
this.Logger.ConsumerPaused(this.Monitor.Name, assignments);
}
else if (assignments.Count == 0 || unassignedBeforeRead)
{
this.Monitor.Result.Update(SubscriptionBatchResult.NotAssigned);
this.Logger.ConsumerNotAssigned(this.Monitor.Name);
}
else
{
this.Logger.BatchEmpty(this.Monitor.Name);
this.Monitor.Result.Update(SubscriptionBatchResult.Empty);
}
}
}