in scalding-base/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala [126:256]
private def handleCoGrouped[K, V](
cg: CoGroupable[K, V],
recurse: FunctionK[TypedPipe, LiteralPipe]
): LiteralPipe[(K, V)] = {
import CoGrouped._
def pipeToCG[V1](t: TypedPipe[(K, V1)]): (CoGroupable[K, V1], List[(String, Boolean)]) =
t match {
case ReduceStepPipe(cg: CoGroupable[K @unchecked, V1 @unchecked]) =>
// we are relying on the fact that we use Ordering[K]
// as a contravariant type, despite it not being defined
// that way.
(cg, Nil)
case CoGroupedPipe(cg) =>
// we are relying on the fact that we use Ordering[K]
// as a contravariant type, despite it not being defined
// that way.
(cg.asInstanceOf[CoGroupable[K, V1]], Nil)
case WithDescriptionTypedPipe(pipe, descs) =>
val (cg, d1) = pipeToCG(pipe)
(cg, ComposeDescriptions.combine(d1, descs))
case kvPipe =>
(IdentityReduce[K, V1, V1](cg.keyOrdering, kvPipe, None, Nil, implicitly), Nil)
}
cg match {
case p @ Pair(_, _, _) =>
def go[A, B, C](pair: Pair[K, A, B, C]): LiteralPipe[(K, C)] = {
val llit = handleCoGrouped(pair.larger, recurse)
val rlit = handleCoGrouped(pair.smaller, recurse)
val fn = pair.fn
Binary(
llit,
rlit,
{ (l: TypedPipe[(K, A)], r: TypedPipe[(K, B)]) =>
val (left, d1) = pipeToCG(l)
val (right, d2) = pipeToCG(r)
val d3 = ComposeDescriptions.combine(d1, d2)
val pair = Pair(left, right, fn)
val withD = d3.foldLeft(pair: CoGrouped[K, C]) { case (p, (d, _)) =>
p.withDescription(d)
}
CoGroupedPipe(withD)
}
)
}
widen(go(p))
case wr @ WithReducers(_, _) =>
def go[V1 <: V](wr: WithReducers[K, V1]): LiteralPipe[(K, V)] = {
val reds = wr.reds
Unary[TypedPipe, (K, V1), (K, V)](
handleCoGrouped(wr.on, recurse),
(tp: TypedPipe[(K, V1)]) =>
tp match {
case ReduceStepPipe(rs) =>
ReduceStepPipe(ReduceStep.withReducers(rs, reds))
case CoGroupedPipe(cg) =>
CoGroupedPipe(WithReducers(cg, reds))
case kvPipe =>
ReduceStepPipe(
IdentityReduce[K, V1, V1](cg.keyOrdering, kvPipe, None, Nil, implicitly)
.withReducers(reds)
)
}
)
}
go(wr)
case wd @ WithDescription(_, _) =>
def go[V1 <: V](wd: WithDescription[K, V1]): LiteralPipe[(K, V)] = {
val desc = wd.description
Unary[TypedPipe, (K, V1), (K, V)](
handleCoGrouped(wd.on, recurse),
(tp: TypedPipe[(K, V1)]) =>
tp match {
case ReduceStepPipe(rs) =>
ReduceStepPipe(ReduceStep.withDescription(rs, desc))
case CoGroupedPipe(cg) =>
CoGroupedPipe(WithDescription(cg, desc))
case kvPipe =>
kvPipe.withDescription(desc)
}
)
}
go(wd)
case fk @ FilterKeys(_, _) =>
def go[V1 <: V](fk: FilterKeys[K, V1]): LiteralPipe[(K, V)] = {
val fn = fk.fn
Unary[TypedPipe, (K, V1), (K, V)](
handleCoGrouped(fk.on, recurse),
(tp: TypedPipe[(K, V1)]) =>
tp match {
case ReduceStepPipe(rs) =>
val mapped = rs.mapped
val mappedF = TypedPipe.FilterKeys(mapped, fn)
ReduceStepPipe(ReduceStep.setInput(rs, mappedF))
case CoGroupedPipe(cg) =>
CoGroupedPipe(FilterKeys(cg, fn))
case kvPipe =>
TypedPipe.FilterKeys(kvPipe, fn)
}
)
}
go(fk)
case mg @ MapGroup(_, _) =>
def go[V1, V2 <: V](mg: MapGroup[K, V1, V2]): LiteralPipe[(K, V)] = {
val fn = mg.fn
Unary[TypedPipe, (K, V1), (K, V)](
handleCoGrouped(mg.on, recurse),
(tp: TypedPipe[(K, V1)]) =>
tp match {
case ReduceStepPipe(rs) =>
ReduceStepPipe(ReduceStep.mapGroup(rs)(fn))
case CoGroupedPipe(cg) =>
CoGroupedPipe(MapGroup(cg, fn))
case kvPipe =>
ReduceStepPipe(
IdentityReduce[K, V1, V1](cg.keyOrdering, kvPipe, None, Nil, implicitly)
.mapGroup(fn)
)
}
)
}
go(mg)
case step @ IdentityReduce(_, _, _, _, _) =>
widen(handleReduceStep(step, recurse))
case step @ UnsortedIdentityReduce(_, _, _, _, _) =>
widen(handleReduceStep(step, recurse))
case step @ IteratorMappedReduce(_, _, _, _, _) =>
widen(handleReduceStep(step, recurse))
}
}