private Set applySchemaUpdateAndGetAffected()

in store/src/main/java/com/epam/eco/schemacatalog/store/schema/kafka/KafkaSchemaRegistryStore.java [612:647]


    private Set<SubjectAndVersion> applySchemaUpdateAndGetAffected(SchemaKey key, SchemaValue value) {
        NavigableMap<Integer, SchemaValue> subjectSchemas = getSubjectSchemasOrElseCreate(key.getSubject());

        SchemaValue oldValue = null;
        if (value != null) {
            oldValue = subjectSchemas.put(key.getVersion(), value);
        } else {
            oldValue = subjectSchemas.remove(key.getVersion());
        }

        if (Objects.equals(oldValue, value)) {
            return Collections.emptySet();
        }

        populateSchemasDeleteIfNeededAndGetAffeted(key.getSubject());

        SubjectAndVersion subjectAndVersion =
                new SubjectAndVersion(key.getSubject(), key.getVersion());

        Set<SubjectAndVersion> affected = new HashSet<>();
        affected.add(subjectAndVersion);
        if (subjectSchemas.size() > 1 && subjectSchemas.lastKey().equals(key.getVersion())) {
            affected.add(
                    new SubjectAndVersion(
                            key.getSubject(),
                            subjectSchemas.lowerKey(key.getVersion())));
        }

        if (oldValue == null) { // schema REGISTER semaphore
            schemaSemaphores.signalDoneFor(subjectAndVersion, SchemaOperation.REGISTER);
        } else if (value != null && value.isDeleted() && !oldValue.isDeleted()) { // schema DELETE semaphore
            schemaSemaphores.signalDoneFor(subjectAndVersion, SchemaOperation.DELETE);
        }

        return affected;
    }