in util-zk/src/main/scala/com/twitter/zk/ZNode.scala [207:265]
def monitorTree(): Offer[ZNode.TreeUpdate] = {
val broker = new Broker[ZNode.TreeUpdate]
/** Pipe events from a subtree's monitor to this broker. */
def pipeSubTreeUpdates(next: Offer[ZNode.TreeUpdate]): Unit = {
next.sync().flatMap(broker ! _).onSuccess { _ => pipeSubTreeUpdates(next) }
}
/** Monitor a watch on this node. */
def monitorWatch(
watch: Future[ZNode.Watch[ZNode.Children]],
knownChildren: Set[ZNode]
): Unit = {
log.debug("monitoring %s with %d known children", path, knownChildren.size)
watch onFailure { e =>
// An error occurred and there's not really anything we can do about it.
log.error(e, "%s: watch could not be established".format(path))
} onSuccess {
// When a node is fetched with a watch, send a ZNode.TreeUpdate on the broker, and start
// monitoring
case ZNode.Watch(Return(zparent), eventUpdate) => {
val children = zparent.children.toSet
val treeUpdate = ZNode.TreeUpdate(
zparent,
added = children -- knownChildren,
removed = knownChildren -- children
)
log.debug("updating %s with %d children", path, treeUpdate.added.size)
broker.send(treeUpdate).sync.onSuccess { _ =>
log.debug("updated %s with %d children", path, treeUpdate.added.size)
treeUpdate.added foreach { z => pipeSubTreeUpdates(z.monitorTree()) }
eventUpdate onSuccess { event =>
log.debug("event received on %s: %s", path, event)
} onSuccess {
case MonitorableEvent() => monitorWatch(zparent.getChildren.watch(), children)
case event => log.debug("Unmonitorable event: %s: %s", path, event)
}
}
}
case ZNode.Watch(Throw(ZNode.Error(_path)), eventUpdate) => {
// Tell the broker about the children we lost; otherwise, if there were no children,
// this deletion should be reflected in a watch on the parent node, if one exists.
if (knownChildren.size > 0) {
broker.send(ZNode.TreeUpdate(this, removed = knownChildren)).sync
} else {
Future.Done
} onSuccess { _ =>
eventUpdate onSuccess {
case MonitorableEvent() => monitorWatch(parent.getChildren.watch(), Set.empty[ZNode])
case event => log.debug("Unmonitorable event: %s: %s", path, event)
}
}
}
}
}
// Initially, we don't know about any children for the node.
monitorWatch(getChildren.watch(), Set.empty[ZNode])
broker.recv
}