in scalding-base/src/main/scala/com/twitter/scalding/typed/WritePartitioner.scala [147:253]
def handleCoGrouped[K, V](
cg: CoGroupable[K, V],
recurse: FunctionK[TypedPipe, mat.TP]
): mat.TP[(K, V)] = {
import CoGrouped._
import TypedPipe._
def pipeToCG[V1](t: TypedPipe[(K, V1)]): CoGroupable[K, V1] =
t match {
case ReduceStepPipe(cg: CoGroupable[K @unchecked, V1 @unchecked]) =>
// we are relying on the fact that we use Ordering[K]
// as a contravariant type, despite it not being defined
// that way.
cg
case CoGroupedPipe(cg) =>
// we are relying on the fact that we use Ordering[K]
// as a contravariant type, despite it not being defined
// that way.
cg.asInstanceOf[CoGroupable[K, V1]]
case kvPipe => IdentityReduce[K, V1, V1](cg.keyOrdering, kvPipe, None, Nil, implicitly)
}
cg match {
case p @ Pair(_, _, _) =>
def go[A, B, C](pair: Pair[K, A, B, C]): mat.TP[(K, C)] = {
val mleft = handleCoGrouped(pair.larger, recurse)
val mright = handleCoGrouped(pair.smaller, recurse)
val both = mat.zip(mleft, mright)
mat.map(both) { case (l, r) =>
CoGroupedPipe(Pair(pipeToCG(l), pipeToCG(r), pair.fn))
}
}
widen(go(p))
case wr @ WithReducers(_, _) =>
def go[V1 <: V](wr: WithReducers[K, V1]): mat.TP[(K, V)] = {
val reds = wr.reds
mat.map(handleCoGrouped(wr.on, recurse)) { (tp: TypedPipe[(K, V1)]) =>
tp match {
case ReduceStepPipe(rs) =>
ReduceStepPipe(ReduceStep.withReducers(rs, reds))
case CoGroupedPipe(cg) =>
CoGroupedPipe(WithReducers(cg, reds))
case kvPipe =>
ReduceStepPipe(
IdentityReduce[K, V1, V1](cg.keyOrdering, kvPipe, None, Nil, implicitly)
.withReducers(reds)
)
}
}
}
go(wr)
case wd @ WithDescription(_, _) =>
def go[V1 <: V](wd: WithDescription[K, V1]): mat.TP[(K, V)] = {
val desc = wd.description
mat.map(handleCoGrouped(wd.on, recurse)) { (tp: TypedPipe[(K, V1)]) =>
tp match {
case ReduceStepPipe(rs) =>
ReduceStepPipe(ReduceStep.withDescription(rs, desc))
case CoGroupedPipe(cg) =>
CoGroupedPipe(WithDescription(cg, desc))
case kvPipe =>
kvPipe.withDescription(desc)
}
}
}
go(wd)
case fk @ CoGrouped.FilterKeys(_, _) =>
def go[V1 <: V](fk: CoGrouped.FilterKeys[K, V1]): mat.TP[(K, V)] = {
val fn = fk.fn
mat.map(handleCoGrouped(fk.on, recurse)) { (tp: TypedPipe[(K, V1)]) =>
tp match {
case ReduceStepPipe(rs) =>
val mapped = rs.mapped
val mappedF = TypedPipe.FilterKeys(mapped, fn)
ReduceStepPipe(ReduceStep.setInput(rs, mappedF))
case CoGroupedPipe(cg) =>
CoGroupedPipe(CoGrouped.FilterKeys(cg, fn))
case kvPipe =>
TypedPipe.FilterKeys(kvPipe, fn)
}
}
}
go(fk)
case mg @ MapGroup(_, _) =>
def go[V1, V2 <: V](mg: MapGroup[K, V1, V2]): mat.TP[(K, V)] = {
val fn = mg.fn
mat.map(handleCoGrouped(mg.on, recurse)) { (tp: TypedPipe[(K, V1)]) =>
tp match {
case ReduceStepPipe(rs) =>
ReduceStepPipe(ReduceStep.mapGroup(rs)(fn))
case CoGroupedPipe(cg) =>
CoGroupedPipe(MapGroup(cg, fn))
case kvPipe =>
val rs = IdentityReduce[K, V1, V1](cg.keyOrdering, kvPipe, None, Nil, implicitly)
ReduceStepPipe(ReduceStep.mapGroup(rs)(fn))
}
}
}
go(mg)
case step @ IdentityReduce(_, _, _, _, _) =>
widen(handleReduceStep(step, recurse)) // the widen trick sidesteps GADT bugs
case step @ UnsortedIdentityReduce(_, _, _, _, _) =>
widen(handleReduceStep(step, recurse))
case step @ IteratorMappedReduce(_, _, _, _, _) =>
widen(handleReduceStep(step, recurse))
}
}