in scalding-base/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala [1012:1047]
private def emptyHashJoinable[K, V](hj: HashJoinable[K, V]): Boolean =
HashJoinable.toReduceStep(hj).mapped == EmptyTypedPipe
def applyWhere[T](on: Dag[TypedPipe]) = {
case CrossPipe(EmptyTypedPipe, _) => EmptyTypedPipe
case CrossPipe(_, EmptyTypedPipe) => EmptyTypedPipe
case CrossValue(EmptyTypedPipe, _) => EmptyTypedPipe
case CrossValue(_, ComputedValue(EmptyTypedPipe)) => EmptyTypedPipe
case CrossValue(_, EmptyValue) => EmptyTypedPipe
case DebugPipe(EmptyTypedPipe) => EmptyTypedPipe
case FilterKeys(EmptyTypedPipe, _) => EmptyTypedPipe
case Filter(EmptyTypedPipe, _) => EmptyTypedPipe
case FlatMapValues(EmptyTypedPipe, _) => EmptyTypedPipe
case FlatMapped(EmptyTypedPipe, _) => EmptyTypedPipe
case ForceToDisk(EmptyTypedPipe) => EmptyTypedPipe
case HashCoGroup(EmptyTypedPipe, _, _) => EmptyTypedPipe
case HashCoGroup(_, right, hjf)
if emptyHashJoinable(right) && Joiner.isInnerHashJoinLike(hjf) == Some(true) =>
EmptyTypedPipe
case MapValues(EmptyTypedPipe, _) => EmptyTypedPipe
case Mapped(EmptyTypedPipe, _) => EmptyTypedPipe
case MergedTypedPipe(EmptyTypedPipe, a) => a
case MergedTypedPipe(a, EmptyTypedPipe) => a
case ReduceStepPipe(rs: ReduceStep[_, _, _]) if rs.mapped == EmptyTypedPipe => EmptyTypedPipe
case SumByLocalKeys(EmptyTypedPipe, _) => EmptyTypedPipe
case TrappedPipe(EmptyTypedPipe, _) => EmptyTypedPipe
case CoGroupedPipe(cgp) if emptyCogroup(cgp) => EmptyTypedPipe
case WithOnComplete(EmptyTypedPipe, _) =>
EmptyTypedPipe // there is nothing to do, so we never have workers complete
case WithDescriptionTypedPipe(EmptyTypedPipe, _) =>
EmptyTypedPipe // descriptions apply to tasks, but empty has no tasks
// This rule is tempting, but dangerous since if used in combination
// with AddExplicitForks it would create an infinite loop
// case Fork(EmptyTypedPipe) => EmptyTypedPipe
}