in src/Epam.Kafka.PubSub/Subscription/SubscriptionHandler.cs [172:201]
private void TryCompact(Dictionary<ConsumeResult<TKey, TValue>, string?> results)
{
if (this.AllowCompaction)
{
var compaction = new Dictionary<TKey, Dictionary<TopicPartition, ConsumeResult<TKey, TValue>>>();
foreach (KeyValuePair<ConsumeResult<TKey, TValue>, string?> item in results.Where(x => x.Value == null))
{
if (!compaction.TryGetValue(item.Key.Message.Key,
out Dictionary<TopicPartition, ConsumeResult<TKey, TValue>>? offsets))
{
offsets = new();
compaction[item.Key.Message.Key] = offsets;
}
if (!offsets.TryGetValue(item.Key.TopicPartition, out ConsumeResult<TKey, TValue>? existing))
{
offsets[item.Key.TopicPartition] = item.Key;
}
else if (existing.Offset < item.Key.Offset)
{
results[existing] = CompactedResult;
}
else
{
results[item.Key] = CompactedResult;
}
}
}
}