def monitorTree()

in util-zk/src/main/scala/com/twitter/zk/ZNode.scala [207:265]


  def monitorTree(): Offer[ZNode.TreeUpdate] = {
    val broker = new Broker[ZNode.TreeUpdate]

    /** Pipe events from a subtree's monitor to this broker. */
    def pipeSubTreeUpdates(next: Offer[ZNode.TreeUpdate]): Unit = {
      next.sync().flatMap(broker ! _).onSuccess { _ => pipeSubTreeUpdates(next) }
    }

    /** Monitor a watch on this node. */
    def monitorWatch(
      watch: Future[ZNode.Watch[ZNode.Children]],
      knownChildren: Set[ZNode]
    ): Unit = {
      log.debug("monitoring %s with %d known children", path, knownChildren.size)
      watch onFailure { e =>
        // An error occurred and there's not really anything we can do about it.
        log.error(e, "%s: watch could not be established".format(path))
      } onSuccess {
        // When a node is fetched with a watch, send a ZNode.TreeUpdate on the broker, and start
        // monitoring
        case ZNode.Watch(Return(zparent), eventUpdate) => {
          val children = zparent.children.toSet
          val treeUpdate = ZNode.TreeUpdate(
            zparent,
            added = children -- knownChildren,
            removed = knownChildren -- children
          )
          log.debug("updating %s with %d children", path, treeUpdate.added.size)
          broker.send(treeUpdate).sync.onSuccess { _ =>
            log.debug("updated %s with %d children", path, treeUpdate.added.size)
            treeUpdate.added foreach { z => pipeSubTreeUpdates(z.monitorTree()) }
            eventUpdate onSuccess { event =>
              log.debug("event received on %s: %s", path, event)
            } onSuccess {
              case MonitorableEvent() => monitorWatch(zparent.getChildren.watch(), children)
              case event => log.debug("Unmonitorable event: %s: %s", path, event)
            }
          }
        }
        case ZNode.Watch(Throw(ZNode.Error(_path)), eventUpdate) => {
          // Tell the broker about the children we lost; otherwise, if there were no children,
          // this deletion should be reflected in a watch on the parent node, if one exists.
          if (knownChildren.size > 0) {
            broker.send(ZNode.TreeUpdate(this, removed = knownChildren)).sync
          } else {
            Future.Done
          } onSuccess { _ =>
            eventUpdate onSuccess {
              case MonitorableEvent() => monitorWatch(parent.getChildren.watch(), Set.empty[ZNode])
              case event => log.debug("Unmonitorable event: %s: %s", path, event)
            }
          }
        }
      }
    }
    // Initially, we don't know about any children for the node.
    monitorWatch(getChildren.watch(), Set.empty[ZNode])
    broker.recv
  }