in scalding-beam/src/main/scala/com/twitter/scalding/beam_backend/BeamBackend.scala [20:130]
def plan(
config: Config,
srcs: Resolver[Input, BeamSource]
): FunctionK[TypedPipe, BeamOp] = {
implicit val kryoCoder: KryoCoder = new KryoCoder(defaultKryoCoderConfiguration(config))
Memoize.functionK(f = new Memoize.RecursiveK[TypedPipe, BeamOp] {
import TypedPipe._
def toFunction[A] = {
case (f @ Filter(_, _), rec) =>
def go[T](f: Filter[T]): BeamOp[T] = {
val Filter(p, fn) = f
rec[T](p).filter(fn)
}
go(f)
case (fk @ FilterKeys(_, _), rec) =>
def go[K, V](node: FilterKeys[K, V]): BeamOp[(K, V)] = {
val FilterKeys(pipe, fn) = node
rec(pipe).filter(FilterKeysToFilter(fn))
}
go(fk)
case (Mapped(input, fn), rec) =>
val op = rec(input)
op.map(fn)
case (FlatMapped(input, fn), rec) =>
val op = rec(input)
op.flatMap(fn)
case (f @ MapValues(_, _), rec) =>
def go[K, V, U](node: MapValues[K, V, U]): BeamOp[(K, U)] = {
val MapValues(pipe, fn) = node
rec(pipe).map(MapValuesToMap[K, V, U](fn))
}
go(f)
case (f @ FlatMapValues(_, _), rec) =>
def go[K, V, U](node: FlatMapValues[K, V, U]): BeamOp[(K, U)] = {
val FlatMapValues(pipe, fn) = node
rec(pipe).flatMap(FlatMapValuesToFlatMap[K, V, U](fn))
}
go(f)
case (SourcePipe(src), _) =>
BeamOp.Source(config, src, srcs(src))
case (IterablePipe(iterable), _) =>
BeamOp.FromIterable(iterable, kryoCoder)
case (wd: WithDescriptionTypedPipe[a], rec) =>
rec[a](wd.input)
case (SumByLocalKeys(pipe, sg), rec) =>
val op = rec(pipe)
config.getMapSideAggregationThreshold match {
case None => op
case Some(count) =>
// Semigroup is invariant on T. We cannot pattern match as it is a Semigroup[PriorityQueue[T]]
if (sg.isInstanceOf[ScaldingPriorityQueueMonoid[_]]) {
op
} else {
op.mapSideAggregator(count, sg)
}
}
case (ReduceStepPipe(ir @ IdentityReduce(_, _, _, _, _)), rec) =>
def go[K, V1, V2](ir: IdentityReduce[K, V1, V2]): BeamOp[(K, V2)] = {
type BeamOpT[V] = BeamOp[(K, V)]
val op = rec(ir.mapped)
ir.evidence.subst[BeamOpT](op)
}
go(ir)
case (ReduceStepPipe(uir @ UnsortedIdentityReduce(_, _, _, _, _)), rec) =>
def go[K, V1, V2](uir: UnsortedIdentityReduce[K, V1, V2]): BeamOp[(K, V2)] = {
type BeamOpT[V] = BeamOp[(K, V)]
val op = rec(uir.mapped)
uir.evidence.subst[BeamOpT](op)
}
go(uir)
case (ReduceStepPipe(ivsr @ IdentityValueSortedReduce(_, _, _, _, _, _)), rec) =>
def go[K, V1, V2](uir: IdentityValueSortedReduce[K, V1, V2]): BeamOp[(K, V2)] = {
type BeamOpT[V] = BeamOp[(K, V)]
val op = rec(uir.mapped)
val sortedOp = op.sorted(uir.keyOrdering, uir.valueSort, kryoCoder)
uir.evidence.subst[BeamOpT](sortedOp)
}
go(ivsr)
case (ReduceStepPipe(ValueSortedReduce(keyOrdering, pipe, valueSort, reduceFn, _, _)), rec) =>
val op = rec(pipe)
op.sortedMapGroup(reduceFn)(keyOrdering, valueSort, kryoCoder)
case (ReduceStepPipe(IteratorMappedReduce(keyOrdering, pipe, reduceFn, _, _)), rec) =>
val op = rec(pipe)
op.mapGroup(reduceFn)(keyOrdering, kryoCoder)
case (hcg @ HashCoGroup(_, _, _), rec) =>
def go[K, V1, V2, W](hcg: HashCoGroup[K, V1, V2, W]): BeamOp[(K, W)] = {
val leftOp = rec(hcg.left)
implicit val orderingK: Ordering[K] = hcg.right.keyOrdering
val rightOp = rec(ReduceStepPipe(HashJoinable.toReduceStep(hcg.right)))
leftOp.hashJoin(rightOp, hcg.joiner)
}
go(hcg)
case (CoGroupedPipe(cg), rec) =>
def go[K, V](cg: CoGrouped[K, V]): BeamOp[(K, V)] = {
val ops: Seq[BeamOp[(K, Any)]] = cg.inputs.map(tp => rec(tp))
CoGroupedOp(cg, ops)
}
go(cg)
case (Fork(input), rec) =>
rec(input)
case (m @ MergedTypedPipe(_, _), rec) =>
OptimizationRules.unrollMerge(m) match {
case Nil => rec(EmptyTypedPipe)
case single :: Nil => rec(single)
case first :: second :: tail => MergedBeamOp(rec(first), rec(second), tail.map(rec(_)))
}
}
})
}