in core/src/main/java/com/epam/eco/kafkamanager/core/permission/repo/zk/ZkPermissionRepo.java [149:193]
public void create(
ResourceType resourceType,
String resourceName,
PatternType patternType,
KafkaPrincipal principal,
AclPermissionType permissionType,
AclOperation operation,
String host) {
Validate.notNull(resourceType, "Resource type is null");
Validate.notBlank(resourceName, "Resource name is blank");
Validate.notNull(patternType, "Pattern type is null");
Validate.notNull(principal, "Principal is null");
Validate.notNull(permissionType, "Permission type is null");
Validate.notNull(operation, "Operation is null");
Validate.notBlank(host, "Host is blank");
ResourcePattern resource = new ResourcePattern(
resourceType,
resourceName,
patternType);
AccessControlEntry entry = new AccessControlEntry(
principal.toString(),
host,
operation,
permissionType);
AclBinding binding = new AclBinding(resource, entry);
ResourceSemaphores.ResourceSemaphore<ResourcePattern, PermissionOperation> semaphore = null;
try {
semaphore = aclCache.callInLock(() -> {
ResourceSemaphores.ResourceSemaphore<ResourcePattern, PermissionOperation> updateSemaphore =
semaphores.createSemaphore(
resource,
PermissionOperation.UPDATE);
adminOperations.createAcl(binding);
return updateSemaphore;
});
semaphore.awaitUnchecked();
} finally {
semaphores.removeSemaphore(semaphore);
}
}