in scalding-base/src/main/scala/com/twitter/scalding/Execution.scala [176:260]
protected def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit
cec: ConcurrentExecutionContext
): Trampoline[CFuture[(T, Map[Long, ExecutionCounters])]]
/**
* This is convenience for when we don't care about the result. like .map(_ => ())
*/
def unit: Execution[Unit] = map(_ => ())
/**
* This waits synchronously on run, using the global execution context Avoid calling this if possible,
* prefering run or just Execution composition. Every time someone calls this, be very suspect. It is always
* code smell. Very seldom should you need to wait on a future.
*/
def waitFor(conf: Config, mode: Mode): Try[T] =
Try(Await.result(run(conf, mode)(ConcurrentExecutionContext.global), duration.Duration.Inf))
/**
* This is here to silence warnings in for comprehensions, but is identical to .filter.
*
* Users should never directly call this method, call .filter
*/
def withFilter(p: T => Boolean): Execution[T] = filter(p)
/*
* run this and that in parallel, without any dependency. This will
* be done in a single cascading flow if possible.
*/
def zip[U](that: Execution[U]): Execution[(T, U)] =
Zipped(this, that)
override val hashCode: Int = MurmurHash3.productHash(self)
/**
* since executions, particularly Zips can cause two executions to merge we can have exponential cost to
* computing equals if we are not careful
*/
override def equals(other: Any): Boolean =
other match {
case otherEx: Execution[_] =>
if (otherEx eq this) true
else if (otherEx.hashCode != hashCode) false
else {
// If we get here, we have two executions that either
// collide in hashcode, or they are truely equal. Since
// collisions are rare, most of these will be true equality
// so we will fully walk the graph. If we don't remember
// the branches we go down, Zipped will be very expensize
import Execution._
val fn = Memoize.function[RefPair[Execution[Any], Execution[Any]], Boolean] {
case (RefPair(a, b), _) if a eq b => true
case (RefPair(BackendExecution(fn0), BackendExecution(fn1)), rec) =>
fn0 == fn1
case (RefPair(FlatMapped(ex0, fn0), FlatMapped(ex1, fn1)), rec) =>
(fn0 == fn1) && rec(RefPair(ex0, ex1))
case (RefPair(FutureConst(fn0), FutureConst(fn1)), rec) =>
fn0 == fn1
case (RefPair(GetCounters(ex0), GetCounters(ex1)), rec) =>
rec(RefPair(ex0, ex1))
case (RefPair(Mapped(ex0, fn0), Mapped(ex1, fn1)), rec) =>
(fn0 == fn1) && rec(RefPair(ex0, ex1))
case (RefPair(OnComplete(ex0, fn0), OnComplete(ex1, fn1)), rec) =>
(fn0 == fn1) && rec(RefPair(ex0, ex1))
case (RefPair(ReaderExecution, ReaderExecution), _) => true
case (RefPair(RecoverWith(ex0, fn0), RecoverWith(ex1, fn1)), rec) =>
(fn0 == fn1) && rec(RefPair(ex0, ex1))
case (RefPair(ResetCounters(ex0), ResetCounters(ex1)), rec) =>
rec(RefPair(ex0, ex1))
case (RefPair(TransformedConfig(ex0, fn0), TransformedConfig(ex1, fn1)), rec) =>
(fn0 == fn1) && rec(RefPair(ex0, ex1))
case (RefPair(UniqueIdExecution(fn0), UniqueIdExecution(fn1)), _) =>
fn0 == fn1
case (RefPair(WithNewCache(ex0), WithNewCache(ex1)), rec) =>
rec(RefPair(ex0, ex1))
case (RefPair(WriteExecution(h0, t0, f0), WriteExecution(h1, t1, f1)), _) =>
(f0 == f1) && ((h0 :: t0) == (h1 :: t1))
case (RefPair(Zipped(a0, b0), Zipped(a1, b1)), rec) =>
rec(RefPair(a0, a1)) && rec(RefPair(b0, b1))
case (rp, _) =>
require(rp._1.getClass != rp._2.getClass)
false // the executions are not of the same type
}
fn(RefPair(this, otherEx))
}
case _ => false
}