in store/src/main/java/com/epam/eco/schemacatalog/store/schema/kafka/KafkaSchemaRegistryStore.java [530:568]
private Set<SubjectAndVersion> applyConfigUpdateAndGetAffected(ConfigKey key, ConfigValue value) {
ConfigValue oldValue;
if (value != null) {
oldValue = configCache.put(key.getSubject(), value);
} else {
oldValue = configCache.remove(key.getSubject());
}
if (Objects.equals(oldValue, value)) {
return Collections.emptySet();
}
Set<SubjectAndVersion> affected = new HashSet<>();
if (key.getSubject() == null) { // global
for (String subject : schemaCache.keySet()) {
if (!configCache.containsKey(subject)) {
NavigableMap<Integer, SchemaValue> subjectSchemas = getSubjectSchemasOrElseEmpty(subject);
affected.addAll(
subjectSchemas.keySet().stream().
map(version -> new SubjectAndVersion(subject, version)).
collect(Collectors.toSet()));
}
}
} else {
NavigableMap<Integer, SchemaValue> subjectSchemas =
getSubjectSchemasOrElseEmpty(key.getSubject());
affected.addAll(
subjectSchemas.keySet().stream().
map(version -> new SubjectAndVersion(key.getSubject(), version)).
collect(Collectors.toSet()));
}
// subject UPDATE semaphore
if (value != null && !Objects.equals(oldValue, value)) {
subjectSemaphores.signalDoneFor(key.getSubject(), SubjectOperation.UPDATE);
}
return affected;
}