def widen[T]()

in scalding-base/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala [35:111]


  def widen[T](l: LiteralPipe[_ <: T]): LiteralPipe[T] =
    // to prove this is safe, see that if you have
    // LiteralPipe[_ <: T] we can call .evaluate to get
    // TypedPipe[_ <: T] which due to covariance is
    // TypedPipe[T], and then using toLiteral we can get
    // LiteralPipe[T]
    //
    // that would be wasteful to apply since the final
    // result is identity.
    l.asInstanceOf[LiteralPipe[T]]

  /**
   * Convert a TypedPipe[T] to a Literal[TypedPipe, T] for use with Dagon
   */
  def toLiteral: FunctionK[TypedPipe, LiteralPipe] =
    Memoize.functionK[TypedPipe, LiteralPipe](new Memoize.RecursiveK[TypedPipe, LiteralPipe] {

      def toFunction[A] = {
        case (cp: CounterPipe[a], f) =>
          Unary(f(cp.pipe), CounterPipe(_: TypedPipe[(a, Iterable[((String, String), Long)])]))
        case (c: CrossPipe[a, b], f) =>
          Binary(f(c.left), f(c.right), CrossPipe(_: TypedPipe[a], _: TypedPipe[b]))
        case (cv @ CrossValue(_, _), f) =>
          def go[A, B](cv: CrossValue[A, B]): LiteralPipe[(A, B)] =
            cv match {
              case CrossValue(a, ComputedValue(v)) =>
                Binary(
                  f(a),
                  f(v),
                  (a: TypedPipe[A], b: TypedPipe[B]) => CrossValue(a, ComputedValue(b))
                )
              case CrossValue(a, v) =>
                Unary(f(a), CrossValue(_: TypedPipe[A], v))
            }
          widen(go(cv))
        case (p: DebugPipe[a], f) =>
          Unary(f(p.input), DebugPipe(_: TypedPipe[a]))
        case (p: FilterKeys[a, b], f) =>
          widen(Unary(f(p.input), FilterKeys(_: TypedPipe[(a, b)], p.fn)))
        case (p: Filter[a], f) =>
          Unary(f(p.input), Filter(_: TypedPipe[a], p.fn))
        case (p: Fork[a], f) =>
          Unary(f(p.input), Fork(_: TypedPipe[a]))
        case (p: FlatMapValues[a, b, c], f) =>
          widen(Unary(f(p.input), FlatMapValues(_: TypedPipe[(a, b)], p.fn)))
        case (p: FlatMapped[a, b], f) =>
          Unary(f(p.input), FlatMapped(_: TypedPipe[a], p.fn))
        case (p: ForceToDisk[a], f) =>
          Unary(f(p.input), ForceToDisk(_: TypedPipe[a]))
        case (it @ IterablePipe(_), _) =>
          Literal.Const(it)
        case (p: MapValues[a, b, c], f) =>
          widen(Unary(f(p.input), MapValues(_: TypedPipe[(a, b)], p.fn)))
        case (p: Mapped[a, b], f) =>
          Unary(f(p.input), Mapped(_: TypedPipe[a], p.fn))
        case (p: MergedTypedPipe[a], f) =>
          Binary(f(p.left), f(p.right), MergedTypedPipe(_: TypedPipe[a], _: TypedPipe[a]))
        case (src @ SourcePipe(_), _) =>
          Literal.Const(src)
        case (p: SumByLocalKeys[a, b], f) =>
          widen(Unary(f(p.input), SumByLocalKeys(_: TypedPipe[(a, b)], p.semigroup)))
        case (p: TrappedPipe[a], f) =>
          Unary(f(p.input), TrappedPipe[a](_: TypedPipe[a], p.sink))
        case (p: WithDescriptionTypedPipe[a], f) =>
          Unary(f(p.input), WithDescriptionTypedPipe(_: TypedPipe[a], p.descriptions))
        case (p: WithOnComplete[a], f) =>
          Unary(f(p.input), WithOnComplete(_: TypedPipe[a], p.fn))
        case (EmptyTypedPipe, _) =>
          Literal.Const(EmptyTypedPipe)
        case (hg: HashCoGroup[a, b, c, d], f) =>
          widen(handleHashCoGroup(hg, f))
        case (CoGroupedPipe(cg), f) =>
          widen(handleCoGrouped(cg, f))
        case (ReduceStepPipe(rs), f) =>
          widen(handleReduceStep(rs, f))
      }
    })