def depthOfId[A]()

in scalding-dagon/src/main/scala/com/twitter/scalding/dagon/Dag.scala [187:307]


  def depthOfId[A](i: Id[A]): Option[Int] =
    depth.get(i)

  def depthOf[A](n: N[A]): Option[Int] =
    find(n).flatMap(depthOfId(_))

  private lazy val depth: Map[Id[_], Int] = {
    sealed trait Rest {
      def dependsOn(id: Id[_]): Boolean
    }
    sealed case class Same(asId: Id[_]) extends Rest {
      def dependsOn(id: Id[_]) = id == asId
    }
    sealed case class MaxInc(a: Id[_], b: Id[_]) extends Rest {
      def dependsOn(id: Id[_]) = (id == a) || (id == b)
    }
    sealed case class Inc(of: Id[_]) extends Rest {
      def dependsOn(id: Id[_]) = id == of
    }
    sealed case class Variadic(ids: List[Id[_]]) extends Rest {
      def dependsOn(id: Id[_]) = ids.contains(id)
    }

    @annotation.tailrec
    def lookup(
        state: Map[Id[_], Int],
        todo: List[(Id[_], Rest)],
        nextRound: List[(Id[_], Rest)]
    ): Map[Id[_], Int] =
      todo match {
        case Nil =>
          nextRound match {
            case Nil => state
            case repeat =>
              val sortRepeat = repeat.sortWith { case ((i0, r0), (i1, r1)) =>
                r1.dependsOn(i0) || (!r0.dependsOn(i1))
              }
              lookup(state, sortRepeat, Nil)
          }
        case (h @ (id, Same(a))) :: rest =>
          state.get(a) match {
            case Some(depth) =>
              val state1 = state.updated(id, depth)
              lookup(state1, rest, nextRound)
            case None =>
              lookup(state, rest, h :: nextRound)
          }
        case (h @ (id, Inc(a))) :: rest =>
          state.get(a) match {
            case Some(depth) =>
              val state1 = state.updated(id, depth + 1)
              lookup(state1, rest, nextRound)
            case None =>
              lookup(state, rest, h :: nextRound)
          }
        case (h @ (id, MaxInc(a, b))) :: rest =>
          (state.get(a), state.get(b)) match {
            case (Some(da), Some(db)) =>
              val depth = math.max(da, db) + 1
              val state1 = state.updated(id, depth)
              lookup(state1, rest, nextRound)
            case _ =>
              lookup(state, rest, h :: nextRound)
          }
        case (id, Variadic(Nil)) :: rest =>
          val depth = 0
          val state1 = state.updated(id, depth)
          lookup(state1, rest, nextRound)
        case (item @ (id, Variadic(h :: t))) :: rest =>
          // max can't throw here because ids is non-empty
          def maxId(head: Id[_], tail: List[Id[_]], acc: Int): Option[Int] =
            state.get(head) match {
              case None => None
              case Some(d) =>
                val newAcc = Math.max(acc, d)
                tail match {
                  case Nil    => Some(newAcc)
                  case h :: t => maxId(h, t, newAcc)
                }
            }

          maxId(h, t, 0) match {
            case Some(depth) =>
              val state1 = state.updated(id, depth + 1)
              lookup(state1, rest, nextRound)
            case None =>
              lookup(state, rest, item :: nextRound)
          }
      }

    @annotation.tailrec
    def loop(
        stack: List[Id[_]],
        seen: Set[Id[_]],
        state: Map[Id[_], Int],
        todo: List[(Id[_], Rest)]
    ): Map[Id[_], Int] =
      stack match {
        case Nil =>
          lookup(state, todo, Nil)
        case h :: tail if seen(h) => loop(tail, seen, state, todo)
        case h :: tail =>
          val seen1 = seen + h
          idToExp.get(h) match {
            case None =>
              loop(tail, seen1, state, todo)
            case Some(Expr.Const(_)) =>
              loop(tail, seen1, state.updated(h, 0), todo)
            case Some(Expr.Var(id)) =>
              loop(id :: tail, seen1, state, (h, Same(id)) :: todo)
            case Some(Expr.Unary(id, _)) =>
              loop(id :: tail, seen1, state, (h, Inc(id)) :: todo)
            case Some(Expr.Binary(id0, id1, _)) =>
              loop(id0 :: id1 :: tail, seen1, state, (h, MaxInc(id0, id1)) :: todo)
            case Some(Expr.Variadic(ids, _)) =>
              loop(ids reverse_::: tail, seen1, state, (h, Variadic(ids)) :: todo)
          }
      }

    loop(roots.toList, Set.empty, Map.empty, Nil)
  }