in scalding-base/src/main/scala/com/twitter/scalding/typed/WritePartitioner.scala [260:285]
def isLogicalReduce(tp: TypedPipe[Any]): Boolean = {
import TypedPipe._
tp match {
case EmptyTypedPipe | IterablePipe(_) | SourcePipe(_) => false
case CounterPipe(a) => isLogicalReduce(a)
case cp @ CrossPipe(_, _) => isLogicalReduce(cp.viaHashJoin)
case cp @ CrossValue(_, _) => isLogicalReduce(cp.viaHashJoin)
case DebugPipe(p) => isLogicalReduce(p)
case FilterKeys(p, _) => isLogicalReduce(p)
case Filter(p, _) => isLogicalReduce(p)
case FlatMapValues(p, _) => isLogicalReduce(p)
case FlatMapped(p, _) => isLogicalReduce(p)
case ForceToDisk(_) => false // not reducers for sure, could be a map-only job
case Fork(_) => false // TODO, not super clear
case HashCoGroup(left, _, _) => isLogicalReduce(left)
case MapValues(p, _) => isLogicalReduce(p)
case Mapped(p, _) => isLogicalReduce(p)
case MergedTypedPipe(_, _) => false
case ReduceStepPipe(_) => true
case SumByLocalKeys(p, _) => isLogicalReduce(p)
case TrappedPipe(p, _) => isLogicalReduce(p)
case CoGroupedPipe(_) => true
case WithOnComplete(p, _) => isLogicalReduce(p)
case WithDescriptionTypedPipe(p, _) => isLogicalReduce(p)
}
}