in src/Epam.Kafka.PubSub/Subscription/State/ExternalState.cs [21:74]
protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TKey, TValue> topic,
ActivityWrapper activitySpan,
CancellationToken cancellationToken)
{
if (topic == null)
throw new ArgumentNullException(nameof(topic));
IReadOnlyCollection<TopicPartition> topicPartitions = topic.Options.GetTopicPartitions();
IReadOnlyCollection<TopicPartitionOffset> state =
topic.GetAndResetState(this._offsetsStorage, topicPartitions, cancellationToken);
var pause = new List<TopicPartition>();
var reset = new List<TopicPartitionOffset>();
var assign = new List<TopicPartitionOffset>();
var assignNonPaused = new List<TopicPartitionOffset>();
foreach (TopicPartitionOffset item in state)
{
// existing assignment, check if offset reset
if (topic.Consumer.Assignment.Contains(item.TopicPartition))
{
if (topic.TryGetOffset(item.TopicPartition, out Offset tp) && tp != item.Offset)
{
ExternalStateExtensions.PauseOrReset(topic, item, pause, reset);
}
}
else
{
TopicPartitionOffset tpo = new(item.TopicPartition, item.Offset);
// first assign offset.end, than pause consumer
if (tpo.Offset == ExternalOffset.Paused)
{
pause.Add(item.TopicPartition);
tpo = new(item.TopicPartition, Offset.End);
}
else
{
assignNonPaused.Add(tpo);
}
assign.Add(tpo);
}
}
topic.OnAssign(assign);
topic.OnReset(reset);
topic.OnPause(pause);
topic.CommitOffsetIfNeeded(activitySpan, reset.Concat(assignNonPaused));
}