public void updateConfiguration()

in core/src/main/java/org/apache/ignite/activestore/impl/config/ReplicaProducersManagerUpdater.java [75:102]


    public void updateConfiguration(Map<UUID, ReplicaConfig> replicasConfigs) {
        Map<UUID, RemoteKafkaProducer> updatedProducers = new HashMap<>(replicasConfigs.size() - 1);
        List<RemoteKafkaProducer> outdatedProducers = new ArrayList<>(replicasConfigs.size() - 1);

        for (Map.Entry<UUID, RemoteKafkaProducer> entry : producers.entrySet()) {
            if (replicasConfigs.containsKey(entry.getKey())) {
                updatedProducers.put(entry.getKey(), entry.getValue());
            }
            else {
                outdatedProducers.add(entry.getValue());
            }
        }
        for (Map.Entry<UUID, ReplicaConfig> entry : replicasConfigs.entrySet()) {
            ReplicaConfig config = entry.getValue();
            UUID replicaId = entry.getKey();

            if (!replicaId.equals(clusterId.get()) && !updatedProducers.containsKey(replicaId)) {
                updatedProducers.put(replicaId, new RemoteKafkaProducer(
                    config,
                    serializer,
                    kafkaFactory,
                    new UnsubscriberOnFailWrapperImpl(replicaProducersUpdaterProvider.get(), replicaId)
                ));
            }
        }
        producers = updatedProducers;
        closeProducers(outdatedProducers);
    }