in store/src/main/java/com/epam/eco/schemacatalog/store/schema/kafka/KafkaSchemaRegistryStore.java [499:528]
private void fireListenersFor(Set<SubjectAndVersion> subjectAndVersions) {
if (CollectionUtils.isEmpty(listeners) || CollectionUtils.isEmpty(subjectAndVersions)) {
return;
}
List<SchemaEntity> updated = new ArrayList<>();
List<SubjectAndVersion> deleted = new ArrayList<>();
subjectAndVersions.forEach(sav -> {
SchemaValue schemaValue = getSubjectSchemasOrElseFail(sav.getSubject()).get(sav.getVersion());
if (schemaValue != null) {
updated.add(toSchemaEntity(schemaValue));
} else {
deleted.add(sav);
}
});
listeners.forEach(listener -> {
try {
listener.onSchemasDeleted(deleted);
} catch (Exception ex) {
LOGGER.error("Failed to handle 'schemas deleted' event", ex);
}
try {
listener.onSchemasUpdated(updated);
} catch (Exception ex) {
LOGGER.error("Failed to handle 'schemas updated' event", ex);
}
});
}