def apply[T]()

in scalding-base/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala [418:455]


    def apply[T](on: Dag[TypedPipe]) = {
      case CounterPipe(a) if needsFork(on, a)   => maybeFork(on, a).map(CounterPipe(_))
      case CrossPipe(a, b) if needsFork(on, a)  => maybeFork(on, a).map(CrossPipe(_, b))
      case CrossPipe(a, b) if needsFork(on, b)  => maybeFork(on, b).map(CrossPipe(a, _))
      case CrossValue(a, b) if needsFork(on, a) => maybeFork(on, a).map(CrossValue(_, b))
      case CrossValue(a, ComputedValue(b)) if needsFork(on, b) =>
        maybeFork(on, b).map(fb => CrossValue(a, ComputedValue(fb)))
      case DebugPipe(p)      => maybeFork(on, p).map(DebugPipe(_))
      case FilterKeys(p, fn) => maybeFork(on, p).map(FilterKeys(_, fn))
      case f @ Filter(_, _) =>
        def go[A](f: Filter[A]): Option[TypedPipe[A]] = {
          val Filter(p, fn) = f
          maybeFork(on, p).map(Filter(_, fn))
        }
        go(f)
      case FlatMapValues(p, fn)     => maybeFork(on, p).map(FlatMapValues(_, fn))
      case FlatMapped(p, fn)        => maybeFork(on, p).map(FlatMapped(_, fn))
      case ForceToDisk(_) | Fork(_) => None // already has a barrier
      case HashCoGroup(left, right, jf) if needsFork(on, left) =>
        maybeFork(on, left).map(HashCoGroup(_, right, jf))
      case HashCoGroup(left, right, jf) => forkHashJoinable(on, right).map(HashCoGroup(left, _, jf))
      case MapValues(p, fn)             => maybeFork(on, p).map(MapValues(_, fn))
      case Mapped(p, fn)                => maybeFork(on, p).map(Mapped(_, fn))
      case MergedTypedPipe(a, b) if needsFork(on, a) => maybeFork(on, a).map(MergedTypedPipe(_, b))
      case MergedTypedPipe(a, b) if needsFork(on, b) => maybeFork(on, b).map(MergedTypedPipe(a, _))
      case ReduceStepPipe(rs)                        => forkReduceStep(on, rs).map(ReduceStepPipe(_))
      case SumByLocalKeys(p, sg)                     => maybeFork(on, p).map(SumByLocalKeys(_, sg))
      case t @ TrappedPipe(_, _) =>
        def go[A](t: TrappedPipe[A]): Option[TypedPipe[A]] = {
          val TrappedPipe(p, sink) = t
          maybeFork(on, p).map(TrappedPipe(_, sink))
        }
        go(t)
      case CoGroupedPipe(cgp)              => forkCoGroup(on, cgp).map(CoGroupedPipe(_))
      case WithOnComplete(p, fn)           => maybeFork(on, p).map(WithOnComplete(_, fn))
      case WithDescriptionTypedPipe(p, ds) => maybeFork(on, p).map(WithDescriptionTypedPipe(_, ds))
      case _                               => None
    }