in store/src/main/java/com/epam/eco/schemacatalog/store/schema/kafka/KafkaSchemaRegistryStore.java [451:497]
public void onCacheUpdated(Map<Key, Value> update) {
if (update.isEmpty()) {
return;
}
Set<SubjectAndVersion> affected = new HashSet<>();
lock.writeLock().lock();
try {
update.forEach((key, value) -> {
try {
if (key.getKeytype() == KeyType.CONFIG) {
affected.addAll(
applyConfigUpdateAndGetAffected(
(ConfigKey) key, (ConfigValue) value));
} else if (key.getKeytype() == KeyType.SCHEMA) {
affected.addAll(
applySchemaUpdateAndGetAffected(
(SchemaKey) key, (SchemaValue) value));
} else if (key.getKeytype() == KeyType.DELETE_SUBJECT) {
affected.addAll(
applySubjectDeleteAndGetAffected(
(DeleteSubjectKey) key,
(DeleteSubjectValue) value));
} else if (key.getKeytype() == KeyType.MODE) {
affected.addAll(
applyModeUpdateAndGetAffected(
(ModeKey) key, (ModeValue) value));
} else {
LOGGER.warn(
"Ignoring unsupported 'schema registry update' record. Key = {}, value = {}",
key, value);
}
} catch (Exception ex) {
LOGGER.error(
String.format(
"Failed to handle 'schema registry update' record. Key = %s, value = %s",
key, value),
ex);
}
});
} finally {
lock.writeLock().unlock();
}
fireListenersFor(affected);
}