private def compile()

in scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala [231:475]


  private def compile(mode: Mode): FunctionK[TypedPipe, CascadingPipe] =
    new FunctionK[TypedPipe, CascadingPipe] {

      private val cache = HCache.empty[TypedPipe, CascadingPipe]

      override def toFunction[U]: TypedPipe[U] => CascadingPipe[U] = withCachePolicy

      private def withCachePolicy[U]: TypedPipe[U] => CascadingPipe[U] = {
        // Don't cache `CrossPipe`, but cache `left` and `right` side of it
        case cp @ CrossPipe(left, right) =>
          notCached(excludes = Set(left, right))(cp)
        // Don't cache `Fork` and `WithDescriptionTypedPipe`
        // since if we do cache them `CrossPipe` will end up being cached as well
        case tp @ Fork(_) =>
          transform(tp, this)
        case tp @ WithDescriptionTypedPipe(_, _) =>
          transform(tp, this)
        // Cache all other typed pipes
        case tp =>
          cache.getOrElseUpdate(tp, transform(tp, this))
      }

      private def notCached(excludes: Set[TypedPipe[_]]): FunctionK[TypedPipe, CascadingPipe] =
        new FunctionK[TypedPipe, CascadingPipe] {
          override def toFunction[U]: TypedPipe[U] => CascadingPipe[U] = { tp =>
            if (excludes.contains(tp)) withCachePolicy(tp) else transform(tp, this)
          }
        }

      private def transform[T](
          pipe: TypedPipe[T],
          rec: FunctionK[TypedPipe, CascadingPipe]
      ): CascadingPipe[T] = pipe match {
        case cp @ CounterPipe(_) =>
          def go[A](cp: CounterPipe[A]): CascadingPipe[A] = {
            val CascadingPipe(pipe0, initF, fd, conv) = rec(cp.pipe)
            val cpipe = RichPipe(pipe0)
              .eachTo(initF -> f0)(
                new IncrementCounters[A](
                  _,
                  TupleConverter
                    .asSuperConverter(conv)
                )
              )
            CascadingPipe.single[A](cpipe, fd)
          }

          go(cp)
        case cp @ CrossPipe(_, _) =>
          rec(cp.viaHashJoin)
        case cv @ CrossValue(_, _) =>
          rec(cv.viaHashJoin)
        case DebugPipe(p) =>
          val inner = rec(p)
          inner.copy(pipe = new Each(inner.pipe, new Debug))
        case EmptyTypedPipe =>
          // just use an empty iterable pipe.
          rec(IterablePipe(List.empty[T]))
        case fk @ FilterKeys(_, _) =>
          def go[K, V](node: FilterKeys[K, V]): CascadingPipe[(K, V)] = {
            val rewrite = Filter[(K, V)](node.input, FilterKeysToFilter(node.fn))
            rec(rewrite)
          }

          go(fk)
        case f @ Filter(_, _) =>
          // hand holding for type inference
          def go[T1 <: T](f: Filter[T1]): CascadingPipe[T] = {
            val Filter(input, fn) = f
            val CascadingPipe(pipe, initF, fd, conv) = rec(input)
            // This does not need a setter, which is nice.
            val fpipe = RichPipe(pipe).filter[T1](initF)(fn)(TupleConverter.asSuperConverter(conv))
            CascadingPipe[T](fpipe, initF, fd, conv)
          }

          go(f)
        case f @ FlatMapValues(_, _) =>
          def go[K, V, U](node: FlatMapValues[K, V, U]): CascadingPipe[T] =
            rec(FlatMapped[(K, V), (K, U)](node.input, FlatMapValuesToFlatMap(node.fn)))

          go(f)
        case fm @ FlatMapped(_, _) =>
          // TODO we can optimize a flatmapped input directly and skip some tupleconverters
          def go[A, B <: T](fm: FlatMapped[A, B]): CascadingPipe[T] = {
            val CascadingPipe(pipe, initF, fd, conv) = rec(fm.input)
            val fmpipe = RichPipe(pipe).flatMapTo[A, T](initF -> f0)(fm.fn)(
              TupleConverter
                .asSuperConverter(conv),
              singleSetter
            )
            CascadingPipe.single[B](fmpipe, fd)
          }

          go(fm)
        case ForceToDisk(input) =>
          val cp = rec(input)
          cp.copy(pipe = RichPipe(cp.pipe).forceToDisk)
        case Fork(input) =>
          // fork doesn't mean anything here since we are already planning each TypedPipe to
          // something in cascading. Fork is an optimizer level operation
          rec(input)
        case IterablePipe(iter) =>
          val fd = new FlowDef
          val pipe = IterableSource[T](iter, f0)(singleSetter, singleConverter).read(fd, mode)
          CascadingPipe.single[T](pipe, fd)
        case f @ MapValues(_, _) =>
          def go[K, A, B](fn: MapValues[K, A, B]): CascadingPipe[_ <: (K, B)] =
            rec(Mapped[(K, A), (K, B)](fn.input, MapValuesToMap(fn.fn)))

          go(f)
        case m @ Mapped(_, _) =>
          def go[A, B <: T](m: Mapped[A, B]): CascadingPipe[T] = {
            val Mapped(input, fn) = m
            val CascadingPipe(pipe, initF, fd, conv) = rec(input)
            val fmpipe = RichPipe(pipe).mapTo[A, T](initF -> f0)(fn)(
              TupleConverter
                .asSuperConverter(conv),
              singleSetter
            )
            CascadingPipe.single[B](fmpipe, fd)
          }

          go(m)

        case m @ MergedTypedPipe(_, _) =>
          OptimizationRules.unrollMerge(m) match {
            case Nil      => rec(EmptyTypedPipe)
            case h :: Nil => rec(h)
            case nonEmpty =>
              // TODO: a better optimization is to not materialize this
              // node at all if there is no fan out since groupBy and cogroupby
              // can accept multiple inputs

              val flowDef = new FlowDef
              // if all of the converters are the same, we could skip some work
              // here, but need to be able to see that correctly
              val pipes = nonEmpty.map(p => rec(p).toPipe(f0, flowDef, singleSetter))
              val merged = new cascading.pipe.Merge(pipes.map(RichPipe.assignName): _*)
              CascadingPipe.single[T](merged, flowDef)
          }
        case SourcePipe(input) =>
          input match {
            case ts: TypedSource[_] =>
              val typedSrc = ts.asInstanceOf[TypedSource[T]]
              val fd = new FlowDef
              val pipe = typedSrc.read(fd, mode)
              CascadingPipe[T](pipe, typedSrc.sourceFields, fd, typedSrc.converter[T])
            case notCascading =>
              throw new IllegalArgumentException(
                s"cascading mode requires TypedSource, found: $notCascading of class ${notCascading.getClass}"
              )
          }
        case sblk @ SumByLocalKeys(_, _) =>
          def go[K, V](sblk: SumByLocalKeys[K, V]): CascadingPipe[(K, V)] = {
            val cp = rec(sblk.input)
            val localFD = new FlowDef
            val cpKV: Pipe = cp.toPipe(kvFields, localFD, tup2Setter)
            val msr = new MapsideReduce(sblk.semigroup, new Fields("key"), valueField, None)(
              singleConverter[V],
              singleSetter[V]
            )
            val kvpipe = RichPipe(cpKV).eachTo(kvFields -> kvFields)(_ => msr)
            CascadingPipe(kvpipe, kvFields, localFD, tuple2Converter[K, V])
          }

          go(sblk)
        case trapped: TrappedPipe[u] =>
          val cp: CascadingPipe[_ <: u] = rec(trapped.input)
          import trapped._
          // TODO: with diamonds in the graph, this might not be correct
          // it seems cascading requires puts the immediate tuple that
          // caused the exception, so if you addTrap( ).map(f).map(g)
          // and f changes the tuple structure, if we don't collapse the
          // maps into 1 operation, cascading can write two different
          // schemas into the trap, making it unreadable.
          // this basically means there can only be one operation in between
          // a trap and a forceToDisk or a groupBy/cogroupBy (any barrier).
          (sink, sink) match {
            case (src: Source, tsink: TypedSink[u @unchecked]) =>
              val optTC: Option[TupleConverter[u]] =
                (sink match {
                  case tsrc: TypedSource[u @unchecked] if tsrc.converter.arity == tsink.setter.arity =>
                    Some(tsrc.converter)
                  case _ =>
                    converterFrom(tsink.setter)
                }).map(TupleConverter.asSuperConverter(_))

              optTC match {
                case Some(tc) =>
                  val fd = new FlowDef
                  val pp: Pipe = cp.toPipe[u](tsink.sinkFields, fd, TupleSetter.asSubSetter(tsink.setter))
                  val pipe = RichPipe.assignName(pp)
                  fd.addTrap(pipe, src.createTap(Write)(mode))
                  CascadingPipe[u](pipe, tsink.sinkFields, fd, tc)
                case None =>
                  logger.warn(
                    s"No TupleConverter found for ${trapped}. Use a TypedSink that is also a TypedSource. Found sink: ${sink}"
                  )
                  // we just ignore the trap in this case.
                  // if the job doesn't fail, the trap would be empty anyway,
                  // if the job does fail, we will see the failure
                  cp
              }
            case _ =>
              // it should be safe to only warn here because
              // if the trap is removed and there is a failure the job should fail
              logger.warn(
                s"Trap on ${trapped.input} does not have a valid output: ${trapped.sink}" +
                  ", a subclass of Source and TypedSink is required\nTrap ignored"
              )
              cp
          }
        case WithDescriptionTypedPipe(input, descs) =>
          @annotation.tailrec
          def loop[A](
              t: TypedPipe[A],
              acc: List[(String, Boolean)]
          ): (TypedPipe[A], List[(String, Boolean)]) =
            t match {
              case WithDescriptionTypedPipe(i, descs) =>
                loop(i, descs ::: acc)
              case notDescr => (notDescr, acc)
            }

          val (root, allDesc) = loop(input, descs)
          val cp = rec(root)
          cp.copy(pipe = applyDescriptions(cp.pipe, allDesc))

        case WithOnComplete(input, fn) =>
          val cp = rec(input)
          val next = new Each(cp.pipe, Fields.ALL, new CleanupIdentityFunction(fn))
          cp.copy(pipe = next)

        case hcg @ HashCoGroup(_, _, _) =>
          def go[K, V1, V2, R](hcg: HashCoGroup[K, V1, V2, R]): CascadingPipe[(K, R)] =
            planHashJoin(hcg.left, hcg.right, hcg.joiner, rec)

          go(hcg)
        case ReduceStepPipe(rs) =>
          planReduceStep(rs, rec)

        case CoGroupedPipe(cg) =>
          planCoGroup(cg, rec)
      }
    }