in scalding-base/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala [418:455]
def apply[T](on: Dag[TypedPipe]) = {
case CounterPipe(a) if needsFork(on, a) => maybeFork(on, a).map(CounterPipe(_))
case CrossPipe(a, b) if needsFork(on, a) => maybeFork(on, a).map(CrossPipe(_, b))
case CrossPipe(a, b) if needsFork(on, b) => maybeFork(on, b).map(CrossPipe(a, _))
case CrossValue(a, b) if needsFork(on, a) => maybeFork(on, a).map(CrossValue(_, b))
case CrossValue(a, ComputedValue(b)) if needsFork(on, b) =>
maybeFork(on, b).map(fb => CrossValue(a, ComputedValue(fb)))
case DebugPipe(p) => maybeFork(on, p).map(DebugPipe(_))
case FilterKeys(p, fn) => maybeFork(on, p).map(FilterKeys(_, fn))
case f @ Filter(_, _) =>
def go[A](f: Filter[A]): Option[TypedPipe[A]] = {
val Filter(p, fn) = f
maybeFork(on, p).map(Filter(_, fn))
}
go(f)
case FlatMapValues(p, fn) => maybeFork(on, p).map(FlatMapValues(_, fn))
case FlatMapped(p, fn) => maybeFork(on, p).map(FlatMapped(_, fn))
case ForceToDisk(_) | Fork(_) => None // already has a barrier
case HashCoGroup(left, right, jf) if needsFork(on, left) =>
maybeFork(on, left).map(HashCoGroup(_, right, jf))
case HashCoGroup(left, right, jf) => forkHashJoinable(on, right).map(HashCoGroup(left, _, jf))
case MapValues(p, fn) => maybeFork(on, p).map(MapValues(_, fn))
case Mapped(p, fn) => maybeFork(on, p).map(Mapped(_, fn))
case MergedTypedPipe(a, b) if needsFork(on, a) => maybeFork(on, a).map(MergedTypedPipe(_, b))
case MergedTypedPipe(a, b) if needsFork(on, b) => maybeFork(on, b).map(MergedTypedPipe(a, _))
case ReduceStepPipe(rs) => forkReduceStep(on, rs).map(ReduceStepPipe(_))
case SumByLocalKeys(p, sg) => maybeFork(on, p).map(SumByLocalKeys(_, sg))
case t @ TrappedPipe(_, _) =>
def go[A](t: TrappedPipe[A]): Option[TypedPipe[A]] = {
val TrappedPipe(p, sink) = t
maybeFork(on, p).map(TrappedPipe(_, sink))
}
go(t)
case CoGroupedPipe(cgp) => forkCoGroup(on, cgp).map(CoGroupedPipe(_))
case WithOnComplete(p, fn) => maybeFork(on, p).map(WithOnComplete(_, fn))
case WithDescriptionTypedPipe(p, ds) => maybeFork(on, p).map(WithDescriptionTypedPipe(_, ds))
case _ => None
}