in scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala [231:475]
private def compile(mode: Mode): FunctionK[TypedPipe, CascadingPipe] =
new FunctionK[TypedPipe, CascadingPipe] {
private val cache = HCache.empty[TypedPipe, CascadingPipe]
override def toFunction[U]: TypedPipe[U] => CascadingPipe[U] = withCachePolicy
private def withCachePolicy[U]: TypedPipe[U] => CascadingPipe[U] = {
// Don't cache `CrossPipe`, but cache `left` and `right` side of it
case cp @ CrossPipe(left, right) =>
notCached(excludes = Set(left, right))(cp)
// Don't cache `Fork` and `WithDescriptionTypedPipe`
// since if we do cache them `CrossPipe` will end up being cached as well
case tp @ Fork(_) =>
transform(tp, this)
case tp @ WithDescriptionTypedPipe(_, _) =>
transform(tp, this)
// Cache all other typed pipes
case tp =>
cache.getOrElseUpdate(tp, transform(tp, this))
}
private def notCached(excludes: Set[TypedPipe[_]]): FunctionK[TypedPipe, CascadingPipe] =
new FunctionK[TypedPipe, CascadingPipe] {
override def toFunction[U]: TypedPipe[U] => CascadingPipe[U] = { tp =>
if (excludes.contains(tp)) withCachePolicy(tp) else transform(tp, this)
}
}
private def transform[T](
pipe: TypedPipe[T],
rec: FunctionK[TypedPipe, CascadingPipe]
): CascadingPipe[T] = pipe match {
case cp @ CounterPipe(_) =>
def go[A](cp: CounterPipe[A]): CascadingPipe[A] = {
val CascadingPipe(pipe0, initF, fd, conv) = rec(cp.pipe)
val cpipe = RichPipe(pipe0)
.eachTo(initF -> f0)(
new IncrementCounters[A](
_,
TupleConverter
.asSuperConverter(conv)
)
)
CascadingPipe.single[A](cpipe, fd)
}
go(cp)
case cp @ CrossPipe(_, _) =>
rec(cp.viaHashJoin)
case cv @ CrossValue(_, _) =>
rec(cv.viaHashJoin)
case DebugPipe(p) =>
val inner = rec(p)
inner.copy(pipe = new Each(inner.pipe, new Debug))
case EmptyTypedPipe =>
// just use an empty iterable pipe.
rec(IterablePipe(List.empty[T]))
case fk @ FilterKeys(_, _) =>
def go[K, V](node: FilterKeys[K, V]): CascadingPipe[(K, V)] = {
val rewrite = Filter[(K, V)](node.input, FilterKeysToFilter(node.fn))
rec(rewrite)
}
go(fk)
case f @ Filter(_, _) =>
// hand holding for type inference
def go[T1 <: T](f: Filter[T1]): CascadingPipe[T] = {
val Filter(input, fn) = f
val CascadingPipe(pipe, initF, fd, conv) = rec(input)
// This does not need a setter, which is nice.
val fpipe = RichPipe(pipe).filter[T1](initF)(fn)(TupleConverter.asSuperConverter(conv))
CascadingPipe[T](fpipe, initF, fd, conv)
}
go(f)
case f @ FlatMapValues(_, _) =>
def go[K, V, U](node: FlatMapValues[K, V, U]): CascadingPipe[T] =
rec(FlatMapped[(K, V), (K, U)](node.input, FlatMapValuesToFlatMap(node.fn)))
go(f)
case fm @ FlatMapped(_, _) =>
// TODO we can optimize a flatmapped input directly and skip some tupleconverters
def go[A, B <: T](fm: FlatMapped[A, B]): CascadingPipe[T] = {
val CascadingPipe(pipe, initF, fd, conv) = rec(fm.input)
val fmpipe = RichPipe(pipe).flatMapTo[A, T](initF -> f0)(fm.fn)(
TupleConverter
.asSuperConverter(conv),
singleSetter
)
CascadingPipe.single[B](fmpipe, fd)
}
go(fm)
case ForceToDisk(input) =>
val cp = rec(input)
cp.copy(pipe = RichPipe(cp.pipe).forceToDisk)
case Fork(input) =>
// fork doesn't mean anything here since we are already planning each TypedPipe to
// something in cascading. Fork is an optimizer level operation
rec(input)
case IterablePipe(iter) =>
val fd = new FlowDef
val pipe = IterableSource[T](iter, f0)(singleSetter, singleConverter).read(fd, mode)
CascadingPipe.single[T](pipe, fd)
case f @ MapValues(_, _) =>
def go[K, A, B](fn: MapValues[K, A, B]): CascadingPipe[_ <: (K, B)] =
rec(Mapped[(K, A), (K, B)](fn.input, MapValuesToMap(fn.fn)))
go(f)
case m @ Mapped(_, _) =>
def go[A, B <: T](m: Mapped[A, B]): CascadingPipe[T] = {
val Mapped(input, fn) = m
val CascadingPipe(pipe, initF, fd, conv) = rec(input)
val fmpipe = RichPipe(pipe).mapTo[A, T](initF -> f0)(fn)(
TupleConverter
.asSuperConverter(conv),
singleSetter
)
CascadingPipe.single[B](fmpipe, fd)
}
go(m)
case m @ MergedTypedPipe(_, _) =>
OptimizationRules.unrollMerge(m) match {
case Nil => rec(EmptyTypedPipe)
case h :: Nil => rec(h)
case nonEmpty =>
// TODO: a better optimization is to not materialize this
// node at all if there is no fan out since groupBy and cogroupby
// can accept multiple inputs
val flowDef = new FlowDef
// if all of the converters are the same, we could skip some work
// here, but need to be able to see that correctly
val pipes = nonEmpty.map(p => rec(p).toPipe(f0, flowDef, singleSetter))
val merged = new cascading.pipe.Merge(pipes.map(RichPipe.assignName): _*)
CascadingPipe.single[T](merged, flowDef)
}
case SourcePipe(input) =>
input match {
case ts: TypedSource[_] =>
val typedSrc = ts.asInstanceOf[TypedSource[T]]
val fd = new FlowDef
val pipe = typedSrc.read(fd, mode)
CascadingPipe[T](pipe, typedSrc.sourceFields, fd, typedSrc.converter[T])
case notCascading =>
throw new IllegalArgumentException(
s"cascading mode requires TypedSource, found: $notCascading of class ${notCascading.getClass}"
)
}
case sblk @ SumByLocalKeys(_, _) =>
def go[K, V](sblk: SumByLocalKeys[K, V]): CascadingPipe[(K, V)] = {
val cp = rec(sblk.input)
val localFD = new FlowDef
val cpKV: Pipe = cp.toPipe(kvFields, localFD, tup2Setter)
val msr = new MapsideReduce(sblk.semigroup, new Fields("key"), valueField, None)(
singleConverter[V],
singleSetter[V]
)
val kvpipe = RichPipe(cpKV).eachTo(kvFields -> kvFields)(_ => msr)
CascadingPipe(kvpipe, kvFields, localFD, tuple2Converter[K, V])
}
go(sblk)
case trapped: TrappedPipe[u] =>
val cp: CascadingPipe[_ <: u] = rec(trapped.input)
import trapped._
// TODO: with diamonds in the graph, this might not be correct
// it seems cascading requires puts the immediate tuple that
// caused the exception, so if you addTrap( ).map(f).map(g)
// and f changes the tuple structure, if we don't collapse the
// maps into 1 operation, cascading can write two different
// schemas into the trap, making it unreadable.
// this basically means there can only be one operation in between
// a trap and a forceToDisk or a groupBy/cogroupBy (any barrier).
(sink, sink) match {
case (src: Source, tsink: TypedSink[u @unchecked]) =>
val optTC: Option[TupleConverter[u]] =
(sink match {
case tsrc: TypedSource[u @unchecked] if tsrc.converter.arity == tsink.setter.arity =>
Some(tsrc.converter)
case _ =>
converterFrom(tsink.setter)
}).map(TupleConverter.asSuperConverter(_))
optTC match {
case Some(tc) =>
val fd = new FlowDef
val pp: Pipe = cp.toPipe[u](tsink.sinkFields, fd, TupleSetter.asSubSetter(tsink.setter))
val pipe = RichPipe.assignName(pp)
fd.addTrap(pipe, src.createTap(Write)(mode))
CascadingPipe[u](pipe, tsink.sinkFields, fd, tc)
case None =>
logger.warn(
s"No TupleConverter found for ${trapped}. Use a TypedSink that is also a TypedSource. Found sink: ${sink}"
)
// we just ignore the trap in this case.
// if the job doesn't fail, the trap would be empty anyway,
// if the job does fail, we will see the failure
cp
}
case _ =>
// it should be safe to only warn here because
// if the trap is removed and there is a failure the job should fail
logger.warn(
s"Trap on ${trapped.input} does not have a valid output: ${trapped.sink}" +
", a subclass of Source and TypedSink is required\nTrap ignored"
)
cp
}
case WithDescriptionTypedPipe(input, descs) =>
@annotation.tailrec
def loop[A](
t: TypedPipe[A],
acc: List[(String, Boolean)]
): (TypedPipe[A], List[(String, Boolean)]) =
t match {
case WithDescriptionTypedPipe(i, descs) =>
loop(i, descs ::: acc)
case notDescr => (notDescr, acc)
}
val (root, allDesc) = loop(input, descs)
val cp = rec(root)
cp.copy(pipe = applyDescriptions(cp.pipe, allDesc))
case WithOnComplete(input, fn) =>
val cp = rec(input)
val next = new Each(cp.pipe, Fields.ALL, new CleanupIdentityFunction(fn))
cp.copy(pipe = next)
case hcg @ HashCoGroup(_, _, _) =>
def go[K, V1, V2, R](hcg: HashCoGroup[K, V1, V2, R]): CascadingPipe[(K, R)] =
planHashJoin(hcg.left, hcg.right, hcg.joiner, rec)
go(hcg)
case ReduceStepPipe(rs) =>
planReduceStep(rs, rec)
case CoGroupedPipe(cg) =>
planCoGroup(cg, rec)
}
}