in scalding-dagon/src/main/scala/com/twitter/scalding/dagon/Dag.scala [187:307]
def depthOfId[A](i: Id[A]): Option[Int] =
depth.get(i)
def depthOf[A](n: N[A]): Option[Int] =
find(n).flatMap(depthOfId(_))
private lazy val depth: Map[Id[_], Int] = {
sealed trait Rest {
def dependsOn(id: Id[_]): Boolean
}
sealed case class Same(asId: Id[_]) extends Rest {
def dependsOn(id: Id[_]) = id == asId
}
sealed case class MaxInc(a: Id[_], b: Id[_]) extends Rest {
def dependsOn(id: Id[_]) = (id == a) || (id == b)
}
sealed case class Inc(of: Id[_]) extends Rest {
def dependsOn(id: Id[_]) = id == of
}
sealed case class Variadic(ids: List[Id[_]]) extends Rest {
def dependsOn(id: Id[_]) = ids.contains(id)
}
@annotation.tailrec
def lookup(
state: Map[Id[_], Int],
todo: List[(Id[_], Rest)],
nextRound: List[(Id[_], Rest)]
): Map[Id[_], Int] =
todo match {
case Nil =>
nextRound match {
case Nil => state
case repeat =>
val sortRepeat = repeat.sortWith { case ((i0, r0), (i1, r1)) =>
r1.dependsOn(i0) || (!r0.dependsOn(i1))
}
lookup(state, sortRepeat, Nil)
}
case (h @ (id, Same(a))) :: rest =>
state.get(a) match {
case Some(depth) =>
val state1 = state.updated(id, depth)
lookup(state1, rest, nextRound)
case None =>
lookup(state, rest, h :: nextRound)
}
case (h @ (id, Inc(a))) :: rest =>
state.get(a) match {
case Some(depth) =>
val state1 = state.updated(id, depth + 1)
lookup(state1, rest, nextRound)
case None =>
lookup(state, rest, h :: nextRound)
}
case (h @ (id, MaxInc(a, b))) :: rest =>
(state.get(a), state.get(b)) match {
case (Some(da), Some(db)) =>
val depth = math.max(da, db) + 1
val state1 = state.updated(id, depth)
lookup(state1, rest, nextRound)
case _ =>
lookup(state, rest, h :: nextRound)
}
case (id, Variadic(Nil)) :: rest =>
val depth = 0
val state1 = state.updated(id, depth)
lookup(state1, rest, nextRound)
case (item @ (id, Variadic(h :: t))) :: rest =>
// max can't throw here because ids is non-empty
def maxId(head: Id[_], tail: List[Id[_]], acc: Int): Option[Int] =
state.get(head) match {
case None => None
case Some(d) =>
val newAcc = Math.max(acc, d)
tail match {
case Nil => Some(newAcc)
case h :: t => maxId(h, t, newAcc)
}
}
maxId(h, t, 0) match {
case Some(depth) =>
val state1 = state.updated(id, depth + 1)
lookup(state1, rest, nextRound)
case None =>
lookup(state, rest, item :: nextRound)
}
}
@annotation.tailrec
def loop(
stack: List[Id[_]],
seen: Set[Id[_]],
state: Map[Id[_], Int],
todo: List[(Id[_], Rest)]
): Map[Id[_], Int] =
stack match {
case Nil =>
lookup(state, todo, Nil)
case h :: tail if seen(h) => loop(tail, seen, state, todo)
case h :: tail =>
val seen1 = seen + h
idToExp.get(h) match {
case None =>
loop(tail, seen1, state, todo)
case Some(Expr.Const(_)) =>
loop(tail, seen1, state.updated(h, 0), todo)
case Some(Expr.Var(id)) =>
loop(id :: tail, seen1, state, (h, Same(id)) :: todo)
case Some(Expr.Unary(id, _)) =>
loop(id :: tail, seen1, state, (h, Inc(id)) :: todo)
case Some(Expr.Binary(id0, id1, _)) =>
loop(id0 :: id1 :: tail, seen1, state, (h, MaxInc(id0, id1)) :: todo)
case Some(Expr.Variadic(ids, _)) =>
loop(ids reverse_::: tail, seen1, state, (h, Variadic(ids)) :: todo)
}
}
loop(roots.toList, Set.empty, Map.empty, Nil)
}