in src/Epam.Kafka.PubSub/Common/Pipeline/PipelineMonitor.cs [99:140]
internal bool TryRegisterGroupId(ConsumerConfig config, SubscriptionOptions options, out string? msg)
{
if (config == null) throw new ArgumentNullException(nameof(config));
msg = null;
bool result = true;
if (options.IsTopicNameWithPartition(out Type? storageType))
{
ConcurrentDictionary<Tuple<string, TopicPartition, Type>, PipelineMonitor> ids = this.Context.PartitionHandlers;
foreach (TopicPartition tp in options.GetTopicPartitions())
{
Tuple<string, TopicPartition, Type> key = new(config.GroupId, tp, storageType!);
result = ids.TryAdd(key, this) || ids.TryUpdate(key, this, this);
if (!result)
{
msg = $" Already used for '{tp}' topic partition with external state storage of type '{storageType}' in '{ids[key].Name}' subscription.";
}
}
}
else
{
ConcurrentDictionary<Tuple<string, string>, Type> ids = this.Context.TopicHandlers;
foreach (string t in options.GetTopicNames())
{
Tuple<string, string> key = new (config.GroupId, t);
result = ids.TryAdd(key, options.HandlerType!) || ids.TryUpdate(key, options.HandlerType!, options.HandlerType!);
if (!result)
{
msg = $"Already used for '{t}' topic with handler of type '{ids[key].Name}'.";
}
}
}
return result;
}