in core/src/main/java/com/epam/eco/kafkamanager/core/consumer/repo/zk/ZkConsumerGroupCache.java [166:207]
private void handleTreeEvent(TreeCacheEvent event) {
if (CuratorUtils.isConnectionStateChangeEvent(event.getType())) {
LOGGER.warn("ZK connection state changed: {}", event.getType());
return;
}
ConsumerGroup updatedGroup = null;
String nameOfRemovedGroup = null;
boolean added = event.getType() == Type.NODE_ADDED;
boolean updated = event.getType() == Type.NODE_UPDATED;
boolean removed = event.getType() == Type.NODE_REMOVED;
if (added || updated || removed) {
if (isConsumerPath(event.getData().getPath())) {
if (added || updated) {
updatedGroup = handleGroupUpdated(event.getData());
} else if (removed) {
nameOfRemovedGroup = handleGroupRemoved(event.getData());
}
} else if (isOwnerPath(event.getData().getPath())) {
if (added || updated) {
updatedGroup = handleTopicUpdated(event.getData());
} else if (removed) {
updatedGroup = handleTopicRemoved(event.getData());
}
} else if (isOffsetPath(event.getData().getPath())) {
if (added || updated) {
updatedGroup = handleOffsetUpdated(event.getData());
} else if (removed) {
updatedGroup = handleOffsetRemoved(event.getData());
}
} else if (isIdPath(event.getData().getPath())) {
if (added || updated) {
updatedGroup = handleMemberUpdated(event.getData());
} else if (removed) {
updatedGroup = handleMemberRemoved(event.getData());
}
}
}
fireCacheListener(updatedGroup, nameOfRemovedGroup);
}