in scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/AsyncFlowDefRunner.scala [281:370]
def execute(conf: Config, writes: List[ToWrite[_]])(implicit
cec: ConcurrentExecutionContext
): CFuture[(Long, ExecutionCounters)] = {
import Execution.ToWrite._
val done = Promise[Unit]()
val phases: Seq[Rule[TypedPipe]] =
CascadingBackend.defaultOptimizationRules(conf)
val optimizedWrites = ToWrite.optimizeWriteBatch(writes, phases)
def prepareFD(c: Config): FlowDef = {
val fd = new FlowDef
def write[A](tpipe: TypedPipe[A], out: Output[A]): Unit =
out match {
case dest: TypedSink[A] @unchecked =>
// We have already applied the optimizations to the batch of writes above
val pipe = CascadingBackend.toPipeUnoptimized(tpipe, dest.sinkFields)(fd, mode, dest.setter)
dest.writeFrom(pipe)(fd, mode)
case _ =>
throw new IllegalArgumentException(
s"cascading mode requires all outputs to be TypedSink, found: $out of class: ${out.getClass}"
)
}
def force[A](init: TypedPipe[A], opt: TypedPipe[A]): Unit = {
val pipePromise = Promise[TypedPipe[A]]()
val fut = pipePromise.future
// This updates mutable state
val sinkOpt = updateState { s =>
val (nextState, added) = s.addForce(conf, init, opt, fut)
if (added) {
val uuid = UUID.randomUUID
val (sink, forcedPipe, clean) = forceToDisk(uuid, c, opt)
(nextState.addFilesToCleanup(conf, clean), Some((sink, forcedPipe)))
} else {
(nextState, None)
}
}
sinkOpt.foreach { case (sink, fp) =>
// We write the optimized pipe
write(opt, sink)
val pipeFut = done.future.map(_ => fp())
pipePromise.completeWith(pipeFut)
}
}
def addIter[A](init: TypedPipe[A], optimized: Either[Iterable[A], Mappable[A]]): Unit = {
val result = optimized match {
case Left(iter) if iter.isEmpty => TypedPipe.EmptyTypedPipe
case Left(iter) => TypedPipe.IterablePipe(iter)
case Right(mappable) => TypedPipe.SourcePipe(mappable)
}
val fut = Future.successful(result)
updateState(_.addForce(conf, init, result, fut))
}
optimizedWrites.foreach {
case OptimizedWrite(init, Force(opt)) =>
force(init, opt)
case OptimizedWrite(init, ToIterable(opt)) =>
def step[A](init: TypedPipe[A], opt: TypedPipe[A]): Unit =
opt match {
case TypedPipe.EmptyTypedPipe => addIter(init, Left(Nil))
case TypedPipe.IterablePipe(as) => addIter(init, Left(as))
case TypedPipe.SourcePipe(src: Mappable[A]) => addIter(init, Right(src))
case other =>
// we need to write the pipe out first.
force(init, opt)
// now, when we go to check for the pipe later, it
// will be a SourcePipe of a Mappable by construction
}
step(init, opt)
case OptimizedWrite(_, SimpleWrite(pipe, sink)) =>
write(pipe, sink)
}
fd
}
val cfuture = validateAndRun(conf)(prepareFD _)
// When we are done, the forced pipes are ready:
done.completeWith(cfuture.future.map(_ => ()))
cfuture
}