internal bool TryRegisterGroupId()

in src/Epam.Kafka.PubSub/Common/Pipeline/PipelineMonitor.cs [99:140]


    internal bool TryRegisterGroupId(ConsumerConfig config, SubscriptionOptions options, out string? msg)
    {
        if (config == null) throw new ArgumentNullException(nameof(config));

        msg = null;
        bool result = true;

        if (options.IsTopicNameWithPartition(out Type? storageType))
        {
            ConcurrentDictionary<Tuple<string, TopicPartition, Type>, PipelineMonitor> ids = this.Context.PartitionHandlers;

            foreach (TopicPartition tp in options.GetTopicPartitions())
            {
                Tuple<string, TopicPartition, Type> key = new(config.GroupId, tp, storageType!);

                result = ids.TryAdd(key, this) || ids.TryUpdate(key, this, this);

                if (!result)
                {
                    msg = $" Already used for '{tp}' topic partition with external state storage of type '{storageType}' in '{ids[key].Name}' subscription.";
                }
            }
        }
        else
        {
            ConcurrentDictionary<Tuple<string, string>, Type> ids = this.Context.TopicHandlers;

            foreach (string t in options.GetTopicNames())
            {
                Tuple<string, string> key = new (config.GroupId, t);

                result = ids.TryAdd(key, options.HandlerType!) || ids.TryUpdate(key, options.HandlerType!, options.HandlerType!);

                if (!result)
                {
                    msg = $"Already used for '{t}' topic with handler of type '{ids[key].Name}'.";
                }
            }
        }

        return result;
    }