private void TryCompact()

in src/Epam.Kafka.PubSub/Subscription/SubscriptionHandler.cs [172:201]


    private void TryCompact(Dictionary<ConsumeResult<TKey, TValue>, string?> results)
    {
        if (this.AllowCompaction)
        {
            var compaction = new Dictionary<TKey, Dictionary<TopicPartition, ConsumeResult<TKey, TValue>>>();

            foreach (KeyValuePair<ConsumeResult<TKey, TValue>, string?> item in results.Where(x => x.Value == null))
            {
                if (!compaction.TryGetValue(item.Key.Message.Key,
                        out Dictionary<TopicPartition, ConsumeResult<TKey, TValue>>? offsets))
                {
                    offsets = new();
                    compaction[item.Key.Message.Key] = offsets;
                }

                if (!offsets.TryGetValue(item.Key.TopicPartition, out ConsumeResult<TKey, TValue>? existing))
                {
                    offsets[item.Key.TopicPartition] = item.Key;
                }
                else if (existing.Offset < item.Key.Offset)
                {
                    results[existing] = CompactedResult;
                }
                else
                {
                    results[item.Key] = CompactedResult;
                }
            }
        }
    }