private Set applyConfigUpdateAndGetAffected()

in store/src/main/java/com/epam/eco/schemacatalog/store/schema/kafka/KafkaSchemaRegistryStore.java [530:568]


    private Set<SubjectAndVersion> applyConfigUpdateAndGetAffected(ConfigKey key, ConfigValue value) {
        ConfigValue oldValue;
        if (value != null) {
            oldValue = configCache.put(key.getSubject(), value);
        } else {
            oldValue = configCache.remove(key.getSubject());
        }

        if (Objects.equals(oldValue, value)) {
            return Collections.emptySet();
        }

        Set<SubjectAndVersion> affected = new HashSet<>();
        if (key.getSubject() == null) { // global
            for (String subject : schemaCache.keySet()) {
                if (!configCache.containsKey(subject)) {
                    NavigableMap<Integer, SchemaValue> subjectSchemas = getSubjectSchemasOrElseEmpty(subject);
                    affected.addAll(
                            subjectSchemas.keySet().stream().
                                    map(version -> new SubjectAndVersion(subject, version)).
                                    collect(Collectors.toSet()));
                }
            }
        } else {
            NavigableMap<Integer, SchemaValue> subjectSchemas =
                    getSubjectSchemasOrElseEmpty(key.getSubject());
            affected.addAll(
                    subjectSchemas.keySet().stream().
                            map(version -> new SubjectAndVersion(key.getSubject(), version)).
                            collect(Collectors.toSet()));
        }

        // subject UPDATE semaphore
        if (value != null && !Objects.equals(oldValue, value)) {
            subjectSemaphores.signalDoneFor(key.getSubject(), SubjectOperation.UPDATE);
        }

        return affected;
    }