in store/src/main/java/com/epam/eco/schemacatalog/store/schema/kafka/KafkaSchemaRegistryStore.java [612:647]
private Set<SubjectAndVersion> applySchemaUpdateAndGetAffected(SchemaKey key, SchemaValue value) {
NavigableMap<Integer, SchemaValue> subjectSchemas = getSubjectSchemasOrElseCreate(key.getSubject());
SchemaValue oldValue = null;
if (value != null) {
oldValue = subjectSchemas.put(key.getVersion(), value);
} else {
oldValue = subjectSchemas.remove(key.getVersion());
}
if (Objects.equals(oldValue, value)) {
return Collections.emptySet();
}
populateSchemasDeleteIfNeededAndGetAffeted(key.getSubject());
SubjectAndVersion subjectAndVersion =
new SubjectAndVersion(key.getSubject(), key.getVersion());
Set<SubjectAndVersion> affected = new HashSet<>();
affected.add(subjectAndVersion);
if (subjectSchemas.size() > 1 && subjectSchemas.lastKey().equals(key.getVersion())) {
affected.add(
new SubjectAndVersion(
key.getSubject(),
subjectSchemas.lowerKey(key.getVersion())));
}
if (oldValue == null) { // schema REGISTER semaphore
schemaSemaphores.signalDoneFor(subjectAndVersion, SchemaOperation.REGISTER);
} else if (value != null && value.isDeleted() && !oldValue.isDeleted()) { // schema DELETE semaphore
schemaSemaphores.signalDoneFor(subjectAndVersion, SchemaOperation.DELETE);
}
return affected;
}