in scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala [813:929]
private def planReduceStep[K, V1, V2](
rs: ReduceStep[K, V1, V2],
rec: FunctionK[TypedPipe, CascadingPipe]
): CascadingPipe[(K, V2)] = {
val mapped = rec(rs.mapped)
def groupOp(gb: GroupBuilder => GroupBuilder): CascadingPipe[_ <: (K, V2)] =
groupOpWithValueSort(None)(gb)
def groupOpWithValueSort(
valueSort: Option[Ordering[V1]]
)(gb: GroupBuilder => GroupBuilder): CascadingPipe[_ <: (K, V2)] = {
val flowDef = new FlowDef
val pipe = maybeBox[K, V1](rs.keyOrdering, flowDef) { (tupleSetter, fields) =>
val (sortOpt, ts) = valueSort
.map {
case ordser: OrderedSerialization[V1 @unchecked] =>
// We get in here when we do a secondary sort
// and that sort is an ordered serialization
// We now need a boxed serializer for this type
// Then we set the comparator on the field, and finally we box the value with our tupleSetter
val (boxfn, boxordSer) = getBoxFnAndOrder[V1](ordser, flowDef)
val valueF = new Fields("value")
valueF.setComparator("value", new CascadingBinaryComparator(boxordSer))
val ts2 = tupleSetter.asInstanceOf[TupleSetter[(K, Boxed[V1])]].contraMap { kv1: (K, V1) =>
(kv1._1, boxfn(kv1._2))
}
(Some(valueF), ts2)
case vs =>
val vord = Field.singleOrdered("value")(vs)
(Some(vord), tupleSetter)
}
.getOrElse((None, tupleSetter))
val p = mapped.toPipe(kvFields, flowDef, TupleSetter.asSubSetter(ts))
RichPipe(p).groupBy(fields) { inGb =>
val withSort = sortOpt.fold(inGb)(inGb.sortBy)
gb(withSort)
}
}
val tupConv = tuple2Conv[K, V2](rs.keyOrdering)
CascadingPipe(pipe, kvFields, flowDef, tupConv)
}
rs match {
case ir @ IdentityReduce(_, _, None, descriptions, _) =>
type CP[V] = CascadingPipe[_ <: (K, V)]
// Not doing anything
ir.evidence.subst[CP](mapped.copy(pipe = RichPipe.setPipeDescriptions(mapped.pipe, descriptions)))
case uir @ UnsortedIdentityReduce(_, _, None, descriptions, _) =>
type CP[V] = CascadingPipe[_ <: (K, V)]
// Not doing anything
uir.evidence.subst[CP](mapped.copy(pipe = RichPipe.setPipeDescriptions(mapped.pipe, descriptions)))
case IdentityReduce(_, _, Some(reds), descriptions, _) =>
groupOp(_.reducers(reds).setDescriptions(descriptions))
case UnsortedIdentityReduce(_, _, Some(reds), descriptions, _) =>
// This is weird, but it is sometimes used to force a partition
groupOp(_.reducers(reds).setDescriptions(descriptions))
case ivsr @ IdentityValueSortedReduce(_, _, _, _, _, _) =>
groupOpWithValueSort(Some(ivsr.valueSort)) { gb =>
// If its an ordered serialization we need to unbox
val mappedGB =
if (ivsr.valueSort.isInstanceOf[OrderedSerialization[_]])
gb.mapStream[Boxed[V1], V1](valueField -> valueField) { it: Iterator[Boxed[V1]] =>
it.map(_.get)
}
else
gb
mappedGB
.reducers(ivsr.reducers.getOrElse(-1))
.setDescriptions(ivsr.descriptions)
}
case vsr @ ValueSortedReduce(_, _, _, _, _, _) =>
val optVOrdering = Some(vsr.valueSort)
groupOpWithValueSort(optVOrdering) {
// If its an ordered serialization we need to unbox
// the value before handing it to the users operation
_.every(
new cascading.pipe.Every(
_,
valueField,
new TypedBufferOp[K, V1, V2](
keyConverter(vsr.keyOrdering),
valueConverter(optVOrdering),
vsr.reduceFn,
valueField
),
Fields.REPLACE
)
)
.reducers(vsr.reducers.getOrElse(-1))
.setDescriptions(vsr.descriptions)
}
case imr @ IteratorMappedReduce(_, _, _, _, _) =>
groupOp {
_.every(
new cascading.pipe.Every(
_,
valueField,
new TypedBufferOp(
keyConverter(imr.keyOrdering),
TupleConverter.singleConverter[V1],
imr.reduceFn,
valueField
),
Fields.REPLACE
)
)
.reducers(imr.reducers.getOrElse(-1))
.setDescriptions(imr.descriptions)
}
}
}