def execute()

in scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/AsyncFlowDefRunner.scala [281:370]


  def execute(conf: Config, writes: List[ToWrite[_]])(implicit
      cec: ConcurrentExecutionContext
  ): CFuture[(Long, ExecutionCounters)] = {

    import Execution.ToWrite._

    val done = Promise[Unit]()

    val phases: Seq[Rule[TypedPipe]] =
      CascadingBackend.defaultOptimizationRules(conf)

    val optimizedWrites = ToWrite.optimizeWriteBatch(writes, phases)

    def prepareFD(c: Config): FlowDef = {
      val fd = new FlowDef

      def write[A](tpipe: TypedPipe[A], out: Output[A]): Unit =
        out match {
          case dest: TypedSink[A] @unchecked =>
            // We have already applied the optimizations to the batch of writes above
            val pipe = CascadingBackend.toPipeUnoptimized(tpipe, dest.sinkFields)(fd, mode, dest.setter)
            dest.writeFrom(pipe)(fd, mode)
          case _ =>
            throw new IllegalArgumentException(
              s"cascading mode requires all outputs to be TypedSink, found: $out of class: ${out.getClass}"
            )
        }

      def force[A](init: TypedPipe[A], opt: TypedPipe[A]): Unit = {
        val pipePromise = Promise[TypedPipe[A]]()
        val fut = pipePromise.future
        // This updates mutable state
        val sinkOpt = updateState { s =>
          val (nextState, added) = s.addForce(conf, init, opt, fut)
          if (added) {
            val uuid = UUID.randomUUID
            val (sink, forcedPipe, clean) = forceToDisk(uuid, c, opt)
            (nextState.addFilesToCleanup(conf, clean), Some((sink, forcedPipe)))
          } else {
            (nextState, None)
          }
        }

        sinkOpt.foreach { case (sink, fp) =>
          // We write the optimized pipe
          write(opt, sink)
          val pipeFut = done.future.map(_ => fp())
          pipePromise.completeWith(pipeFut)
        }
      }
      def addIter[A](init: TypedPipe[A], optimized: Either[Iterable[A], Mappable[A]]): Unit = {
        val result = optimized match {
          case Left(iter) if iter.isEmpty => TypedPipe.EmptyTypedPipe
          case Left(iter)                 => TypedPipe.IterablePipe(iter)
          case Right(mappable)            => TypedPipe.SourcePipe(mappable)
        }
        val fut = Future.successful(result)
        updateState(_.addForce(conf, init, result, fut))
      }

      optimizedWrites.foreach {
        case OptimizedWrite(init, Force(opt)) =>
          force(init, opt)
        case OptimizedWrite(init, ToIterable(opt)) =>
          def step[A](init: TypedPipe[A], opt: TypedPipe[A]): Unit =
            opt match {
              case TypedPipe.EmptyTypedPipe               => addIter(init, Left(Nil))
              case TypedPipe.IterablePipe(as)             => addIter(init, Left(as))
              case TypedPipe.SourcePipe(src: Mappable[A]) => addIter(init, Right(src))
              case other                                  =>
                // we need to write the pipe out first.
                force(init, opt)
              // now, when we go to check for the pipe later, it
              // will be a SourcePipe of a Mappable by construction
            }
          step(init, opt)

        case OptimizedWrite(_, SimpleWrite(pipe, sink)) =>
          write(pipe, sink)
      }

      fd
    }

    val cfuture = validateAndRun(conf)(prepareFD _)

    // When we are done, the forced pipes are ready:
    done.completeWith(cfuture.future.map(_ => ()))
    cfuture
  }