in store/src/main/java/com/epam/eco/schemacatalog/store/schema/kafka/KafkaSchemaRegistryStore.java [570:610]
private Set<SubjectAndVersion> applyModeUpdateAndGetAffected(ModeKey key, ModeValue value) {
ModeValue oldValue;
if (value != null) {
oldValue = modeCache.put(key.getSubject(), value);
} else {
oldValue = modeCache.remove(key.getSubject());
}
if (Objects.equals(oldValue, value)) {
return Collections.emptySet();
}
Set<SubjectAndVersion> affected = new HashSet<>();
if (key.getSubject() == null) { // global
for (String subject : schemaCache.keySet()) {
if (!modeCache.containsKey(subject)) {
NavigableMap<Integer, SchemaValue> subjectSchemas = getSubjectSchemasOrElseEmpty(subject);
affected.addAll(
subjectSchemas.keySet().stream().
map(version -> new SubjectAndVersion(subject, version)).
collect(Collectors.toSet()));
}
}
} else {
NavigableMap<Integer, SchemaValue> subjectSchemas =
getSubjectSchemasOrElseEmpty(key.getSubject());
affected.addAll(
subjectSchemas.keySet().stream().
map(version -> new SubjectAndVersion(key.getSubject(), version)).
collect(Collectors.toSet()));
}
// subject UPDATE semaphore
/* not needed unless we expose possibility to change mode
if (value != null && !Objects.equals(oldValue, value)) {
subjectSemaphores.signalDoneFor(key.getSubject(), SubjectOperation.UPDATE);
}
*/
return affected;
}