in scalding-base/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala [35:111]
def widen[T](l: LiteralPipe[_ <: T]): LiteralPipe[T] =
// to prove this is safe, see that if you have
// LiteralPipe[_ <: T] we can call .evaluate to get
// TypedPipe[_ <: T] which due to covariance is
// TypedPipe[T], and then using toLiteral we can get
// LiteralPipe[T]
//
// that would be wasteful to apply since the final
// result is identity.
l.asInstanceOf[LiteralPipe[T]]
/**
* Convert a TypedPipe[T] to a Literal[TypedPipe, T] for use with Dagon
*/
def toLiteral: FunctionK[TypedPipe, LiteralPipe] =
Memoize.functionK[TypedPipe, LiteralPipe](new Memoize.RecursiveK[TypedPipe, LiteralPipe] {
def toFunction[A] = {
case (cp: CounterPipe[a], f) =>
Unary(f(cp.pipe), CounterPipe(_: TypedPipe[(a, Iterable[((String, String), Long)])]))
case (c: CrossPipe[a, b], f) =>
Binary(f(c.left), f(c.right), CrossPipe(_: TypedPipe[a], _: TypedPipe[b]))
case (cv @ CrossValue(_, _), f) =>
def go[A, B](cv: CrossValue[A, B]): LiteralPipe[(A, B)] =
cv match {
case CrossValue(a, ComputedValue(v)) =>
Binary(
f(a),
f(v),
(a: TypedPipe[A], b: TypedPipe[B]) => CrossValue(a, ComputedValue(b))
)
case CrossValue(a, v) =>
Unary(f(a), CrossValue(_: TypedPipe[A], v))
}
widen(go(cv))
case (p: DebugPipe[a], f) =>
Unary(f(p.input), DebugPipe(_: TypedPipe[a]))
case (p: FilterKeys[a, b], f) =>
widen(Unary(f(p.input), FilterKeys(_: TypedPipe[(a, b)], p.fn)))
case (p: Filter[a], f) =>
Unary(f(p.input), Filter(_: TypedPipe[a], p.fn))
case (p: Fork[a], f) =>
Unary(f(p.input), Fork(_: TypedPipe[a]))
case (p: FlatMapValues[a, b, c], f) =>
widen(Unary(f(p.input), FlatMapValues(_: TypedPipe[(a, b)], p.fn)))
case (p: FlatMapped[a, b], f) =>
Unary(f(p.input), FlatMapped(_: TypedPipe[a], p.fn))
case (p: ForceToDisk[a], f) =>
Unary(f(p.input), ForceToDisk(_: TypedPipe[a]))
case (it @ IterablePipe(_), _) =>
Literal.Const(it)
case (p: MapValues[a, b, c], f) =>
widen(Unary(f(p.input), MapValues(_: TypedPipe[(a, b)], p.fn)))
case (p: Mapped[a, b], f) =>
Unary(f(p.input), Mapped(_: TypedPipe[a], p.fn))
case (p: MergedTypedPipe[a], f) =>
Binary(f(p.left), f(p.right), MergedTypedPipe(_: TypedPipe[a], _: TypedPipe[a]))
case (src @ SourcePipe(_), _) =>
Literal.Const(src)
case (p: SumByLocalKeys[a, b], f) =>
widen(Unary(f(p.input), SumByLocalKeys(_: TypedPipe[(a, b)], p.semigroup)))
case (p: TrappedPipe[a], f) =>
Unary(f(p.input), TrappedPipe[a](_: TypedPipe[a], p.sink))
case (p: WithDescriptionTypedPipe[a], f) =>
Unary(f(p.input), WithDescriptionTypedPipe(_: TypedPipe[a], p.descriptions))
case (p: WithOnComplete[a], f) =>
Unary(f(p.input), WithOnComplete(_: TypedPipe[a], p.fn))
case (EmptyTypedPipe, _) =>
Literal.Const(EmptyTypedPipe)
case (hg: HashCoGroup[a, b, c, d], f) =>
widen(handleHashCoGroup(hg, f))
case (CoGroupedPipe(cg), f) =>
widen(handleCoGrouped(cg, f))
case (ReduceStepPipe(rs), f) =>
widen(handleReduceStep(rs, f))
}
})