public void onCacheUpdated()

in store/src/main/java/com/epam/eco/schemacatalog/store/schema/kafka/KafkaSchemaRegistryStore.java [451:497]


    public void onCacheUpdated(Map<Key, Value> update) {
        if (update.isEmpty()) {
            return;
        }

        Set<SubjectAndVersion> affected = new HashSet<>();

        lock.writeLock().lock();
        try {
            update.forEach((key, value) -> {
                try {
                    if (key.getKeytype() == KeyType.CONFIG) {
                        affected.addAll(
                                applyConfigUpdateAndGetAffected(
                                        (ConfigKey) key, (ConfigValue) value));
                    } else if (key.getKeytype() == KeyType.SCHEMA) {
                        affected.addAll(
                                applySchemaUpdateAndGetAffected(
                                        (SchemaKey) key, (SchemaValue) value));
                    } else if (key.getKeytype() == KeyType.DELETE_SUBJECT) {
                        affected.addAll(
                                applySubjectDeleteAndGetAffected(
                                        (DeleteSubjectKey) key,
                                        (DeleteSubjectValue) value));
                    } else if (key.getKeytype() == KeyType.MODE) {
                        affected.addAll(
                                applyModeUpdateAndGetAffected(
                                        (ModeKey) key, (ModeValue) value));
                    } else {
                        LOGGER.warn(
                                "Ignoring unsupported 'schema registry update' record. Key = {}, value = {}",
                                key, value);
                    }
                } catch (Exception ex) {
                    LOGGER.error(
                            String.format(
                                    "Failed to handle 'schema registry update' record. Key = %s, value = %s",
                                    key, value),
                            ex);
                }
            });
        } finally {
            lock.writeLock().unlock();
        }

        fireListenersFor(affected);
    }