def planner()

in scalding-base/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryPlanner.scala [16:180]


  def planner(conf: Config, srcs: Resolver[Input, MemorySource]): FunctionK[TypedPipe, Op] =
    Memoize.functionK(new Memoize.RecursiveK[TypedPipe, Op] {
      import TypedPipe._

      def toFunction[T] = {
        case (CounterPipe(pipe), rec) =>
          // TODO: counters not yet supported, but can be with an concurrent hashmap
          rec(pipe.map(_._1))
        case (cp @ CrossPipe(_, _), rec) =>
          rec(cp.viaHashJoin)
        case (CrossValue(left, EmptyValue), _) => Op.empty
        case (CrossValue(left, LiteralValue(v)), rec) =>
          val op = rec(left) // linter:disable:UndesirableTypeInference
          op.map((_, v))
        case (CrossValue(left, ComputedValue(right)), rec) =>
          rec(CrossPipe(left, right))
        case (DebugPipe(p), rec) =>
          // There is really little that can be done here but println
          rec(p.map { t => println(t); t })

        case (EmptyTypedPipe, _) =>
          // just use an empty iterable pipe.
          Op.empty[T]

        case (fk @ FilterKeys(_, _), rec) =>
          def go[K, V](node: FilterKeys[K, V]): Op[(K, V)] = {
            val FilterKeys(pipe, fn) = node
            rec(pipe).concatMap { case (k, v) =>
              if (fn(k)) { (k, v) :: Nil }
              else Nil
            }
          }
          go(fk)

        case (f @ Filter(_, _), rec) =>
          def go[T](f: Filter[T]): Op[T] = {
            val Filter(p, fn) = f
            rec(p).filter(fn)
          }
          go(f)

        case (f @ FlatMapValues(_, _), rec) =>
          def go[K, V, U](node: FlatMapValues[K, V, U]) = {
            val fn = node.fn
            rec(node.input).concatMap { case (k, v) => fn(v).map((k, _)) }
          }

          go(f)

        case (FlatMapped(prev, fn), rec) =>
          rec(prev).concatMap(fn) // linter:disable:UndesirableTypeInference

        case (ForceToDisk(pipe), rec) =>
          rec(pipe).materialize

        case (Fork(pipe), rec) =>
          rec(pipe).materialize

        case (IterablePipe(iterable), _) =>
          Op.source(iterable)

        case (f @ MapValues(_, _), rec) =>
          def go[K, V, U](node: MapValues[K, V, U]) = {
            val mvfn = node.fn
            rec(node.input).map { case (k, v) => (k, mvfn(v)) }
          }

          go(f)

        case (Mapped(input, fn), rec) =>
          rec(input).map(fn) // linter:disable:UndesirableTypeInference

        case (MergedTypedPipe(left, right), rec) =>
          Op.Concat(rec(left), rec(right))

        case (SourcePipe(src), _) =>
          val optsrc = srcs(src)
          Op.Source(cec => MemorySource.readOption(optsrc, src.toString)(cec))

        case (slk @ SumByLocalKeys(_, _), rec) =>
          def sum[K, V](sblk: SumByLocalKeys[K, V]) = {
            val SumByLocalKeys(p, sg) = sblk

            rec(p).transform[(K, V), (K, V)] { kvs =>
              val map = collection.mutable.Map.empty[K, V]
              val iter = kvs.iterator
              while (iter.hasNext) {
                val (k, v) = iter.next
                map(k) = map.get(k) match {
                  case None     => v
                  case Some(v1) => sg.plus(v1, v)
                }
              }
              val res = new ArrayBuffer[(K, V)](map.size)
              map.foreach(res += _)
              res
            }
          }
          sum(slk)
        case (TrappedPipe(input, _), rec) =>
          // this can be interpretted as catching any exception
          // on the map-phase until the next partition, so it can
          // be made to work by changing Op to return all
          // the values that fail on error
          rec(input)

        case (WithDescriptionTypedPipe(pipe, descriptions), rec) =>
          // TODO we could optionally print out the descriptions
          // after the future completes
          rec(pipe)

        case (WithOnComplete(pipe, fn), rec) =>
          Op.OnComplete(rec(pipe), fn)

        case (hcg @ HashCoGroup(_, _, _), rec) =>
          def go[K, V1, V2, R](hcg: HashCoGroup[K, V1, V2, R]) = {
            val leftOp = rec(hcg.left)
            val rightOp = rec(ReduceStepPipe(HashJoinable.toReduceStep(hcg.right)))
            Op.Join[(K, V1), (K, V2), (K, R)](
              leftOp,
              rightOp,
              { (v1s, v2s) =>
                val kv2 = v2s.groupBy(_._1)
                val result = new ArrayBuffer[(K, R)]()
                v1s.foreach { case (k, v1) =>
                  val v2 = kv2.getOrElse(k, Nil).map(_._2)
                  result ++= hcg.joiner(k, v1, v2).map((k, _))
                }
                result
              }
            )
          }
          go(hcg)

        case (CoGroupedPipe(cg), rec) =>
          def go[K, V](cg: CoGrouped[K, V]) =
            Op.BulkJoin(cg.inputs.map(rec(_)), cg.joinFunction)
          go(cg)

        case (ReduceStepPipe(ir @ IdentityReduce(_, _, _, descriptions, _)), rec) =>
          def go[K, V1, V2](ir: IdentityReduce[K, V1, V2]): Op[(K, V2)] = {
            type OpT[V] = Op[(K, V)]
            val op = rec(ir.mapped)
            ir.evidence.subst[OpT](op)
          }
          go(ir)
        case (ReduceStepPipe(uir @ UnsortedIdentityReduce(_, _, _, descriptions, _)), rec) =>
          def go[K, V1, V2](uir: UnsortedIdentityReduce[K, V1, V2]): Op[(K, V2)] = {
            type OpT[V] = Op[(K, V)]
            val op = rec(uir.mapped)
            uir.evidence.subst[OpT](op)
          }
          go(uir)
        case (ReduceStepPipe(IdentityValueSortedReduce(_, pipe, ord, _, _, _)), rec) =>
          def go[K, V](p: TypedPipe[(K, V)], ord: Ordering[V]) = {
            val op = rec(p)
            Op.Reduce[K, V, V](op, (k, vs) => vs, Some(ord))
          }
          go(pipe, ord)
        case (ReduceStepPipe(ValueSortedReduce(_, pipe, ord, fn, _, _)), rec) =>
          Op.Reduce(rec(pipe), fn, Some(ord))
        case (ReduceStepPipe(IteratorMappedReduce(_, pipe, fn, _, _)), rec) =>
          Op.Reduce(rec(pipe), fn, None)
      }
    })