in core/src/main/java/org/apache/ignite/activestore/impl/config/ReplicaProducersManagerUpdater.java [75:102]
public void updateConfiguration(Map<UUID, ReplicaConfig> replicasConfigs) {
Map<UUID, RemoteKafkaProducer> updatedProducers = new HashMap<>(replicasConfigs.size() - 1);
List<RemoteKafkaProducer> outdatedProducers = new ArrayList<>(replicasConfigs.size() - 1);
for (Map.Entry<UUID, RemoteKafkaProducer> entry : producers.entrySet()) {
if (replicasConfigs.containsKey(entry.getKey())) {
updatedProducers.put(entry.getKey(), entry.getValue());
}
else {
outdatedProducers.add(entry.getValue());
}
}
for (Map.Entry<UUID, ReplicaConfig> entry : replicasConfigs.entrySet()) {
ReplicaConfig config = entry.getValue();
UUID replicaId = entry.getKey();
if (!replicaId.equals(clusterId.get()) && !updatedProducers.containsKey(replicaId)) {
updatedProducers.put(replicaId, new RemoteKafkaProducer(
config,
serializer,
kafkaFactory,
new UnsubscriberOnFailWrapperImpl(replicaProducersUpdaterProvider.get(), replicaId)
));
}
}
producers = updatedProducers;
closeProducers(outdatedProducers);
}