in store/src/main/java/com/epam/eco/schemacatalog/store/schema/kafka/KafkaSchemaRegistryStore.java [381:410]
public SchemaEntity registerSchema(String subject, ParsedSchema schema) {
Validate.notBlank(subject, "Subject is blank");
Validate.notNull(schema, "Schema is null");
int version;
ResourceSemaphore<SubjectAndVersion, SchemaOperation> semaphore = null;
lock.readLock().lock();
try {
schemaRegistryClient.registerUnchecked(subject, schema);
version = schemaRegistryClient.getVersionUnchecked(subject, schema);
if (!schemaExists(subject, version)) {
semaphore = schemaSemaphores.createSemaphore(
new SubjectAndVersion(subject, version),
SchemaOperation.REGISTER);
}
} finally {
lock.readLock().unlock();
}
if (semaphore != null) {
try {
semaphore.awaitUnchecked();
} finally {
semaphore.close();
}
}
return getSchema(subject, version);
}