in commons/src/main/java/com/epam/eco/schemacatalog/serde/kafka/BySchemaFieldsVerifier.java [109:156]
private void updateAllowedVersionRangeIfNeeded(Integer versionToExamine) {
if (subjectSchemas != null && subjectSchemas.getSchemasAsMap().containsKey(versionToExamine)) {
return;
}
subjectSchemas = schemaRegistryClient.getSubjectSchemaInfos(subject);
// sanity checks
if (subjectSchemas.getSchemas().isEmpty()) {
throw new RuntimeException(
String.format("Subject %s has no schemas", subject));
}
if (!subjectSchemas.getSchemasAsMap().containsKey(versionToExamine)) {
throw new RuntimeException(
String.format("Subject %s has no schema of version %d", subject, versionToExamine));
}
int earliestVersion = -1;
int latestVersion = -1;
for (BasicSchemaInfo schemaInfo : subjectSchemas) {
Set<String> schemaFields = extractSchemaFields(schemaInfo.getSchemaAvro());
// has required fields
if (!schemaFields.containsAll(requiredFields)) {
if (earliestVersion == -1) {
continue;
} else {
break;
}
}
earliestVersion = earliestVersion == -1 ? schemaInfo.getVersion() : earliestVersion;
// has expected fields
if (expectedFields.isEmpty() || schemaFields.containsAll(expectedFields)) {
latestVersion = schemaInfo.getVersion();
}
}
if (earliestVersion == -1 || latestVersion == -1) {
throw new RuntimeException(
String.format(
"No schemas meeting given conditions: required fields=%s, expected fields=%s",
requiredFields, expectedFields));
}
allowedVersionRange = SchemaVersionRange.with(earliestVersion, latestVersion);
}