private def handleCoGrouped[K, V]()

in scalding-base/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala [126:256]


  private def handleCoGrouped[K, V](
      cg: CoGroupable[K, V],
      recurse: FunctionK[TypedPipe, LiteralPipe]
  ): LiteralPipe[(K, V)] = {
    import CoGrouped._

    def pipeToCG[V1](t: TypedPipe[(K, V1)]): (CoGroupable[K, V1], List[(String, Boolean)]) =
      t match {
        case ReduceStepPipe(cg: CoGroupable[K @unchecked, V1 @unchecked]) =>
          // we are relying on the fact that we use Ordering[K]
          // as a contravariant type, despite it not being defined
          // that way.
          (cg, Nil)
        case CoGroupedPipe(cg) =>
          // we are relying on the fact that we use Ordering[K]
          // as a contravariant type, despite it not being defined
          // that way.
          (cg.asInstanceOf[CoGroupable[K, V1]], Nil)
        case WithDescriptionTypedPipe(pipe, descs) =>
          val (cg, d1) = pipeToCG(pipe)
          (cg, ComposeDescriptions.combine(d1, descs))
        case kvPipe =>
          (IdentityReduce[K, V1, V1](cg.keyOrdering, kvPipe, None, Nil, implicitly), Nil)
      }

    cg match {
      case p @ Pair(_, _, _) =>
        def go[A, B, C](pair: Pair[K, A, B, C]): LiteralPipe[(K, C)] = {
          val llit = handleCoGrouped(pair.larger, recurse)
          val rlit = handleCoGrouped(pair.smaller, recurse)
          val fn = pair.fn
          Binary(
            llit,
            rlit,
            { (l: TypedPipe[(K, A)], r: TypedPipe[(K, B)]) =>
              val (left, d1) = pipeToCG(l)
              val (right, d2) = pipeToCG(r)
              val d3 = ComposeDescriptions.combine(d1, d2)
              val pair = Pair(left, right, fn)
              val withD = d3.foldLeft(pair: CoGrouped[K, C]) { case (p, (d, _)) =>
                p.withDescription(d)
              }
              CoGroupedPipe(withD)
            }
          )
        }
        widen(go(p))
      case wr @ WithReducers(_, _) =>
        def go[V1 <: V](wr: WithReducers[K, V1]): LiteralPipe[(K, V)] = {
          val reds = wr.reds
          Unary[TypedPipe, (K, V1), (K, V)](
            handleCoGrouped(wr.on, recurse),
            (tp: TypedPipe[(K, V1)]) =>
              tp match {
                case ReduceStepPipe(rs) =>
                  ReduceStepPipe(ReduceStep.withReducers(rs, reds))
                case CoGroupedPipe(cg) =>
                  CoGroupedPipe(WithReducers(cg, reds))
                case kvPipe =>
                  ReduceStepPipe(
                    IdentityReduce[K, V1, V1](cg.keyOrdering, kvPipe, None, Nil, implicitly)
                      .withReducers(reds)
                  )
              }
          )
        }
        go(wr)
      case wd @ WithDescription(_, _) =>
        def go[V1 <: V](wd: WithDescription[K, V1]): LiteralPipe[(K, V)] = {
          val desc = wd.description
          Unary[TypedPipe, (K, V1), (K, V)](
            handleCoGrouped(wd.on, recurse),
            (tp: TypedPipe[(K, V1)]) =>
              tp match {
                case ReduceStepPipe(rs) =>
                  ReduceStepPipe(ReduceStep.withDescription(rs, desc))
                case CoGroupedPipe(cg) =>
                  CoGroupedPipe(WithDescription(cg, desc))
                case kvPipe =>
                  kvPipe.withDescription(desc)
              }
          )
        }
        go(wd)
      case fk @ FilterKeys(_, _) =>
        def go[V1 <: V](fk: FilterKeys[K, V1]): LiteralPipe[(K, V)] = {
          val fn = fk.fn
          Unary[TypedPipe, (K, V1), (K, V)](
            handleCoGrouped(fk.on, recurse),
            (tp: TypedPipe[(K, V1)]) =>
              tp match {
                case ReduceStepPipe(rs) =>
                  val mapped = rs.mapped
                  val mappedF = TypedPipe.FilterKeys(mapped, fn)
                  ReduceStepPipe(ReduceStep.setInput(rs, mappedF))
                case CoGroupedPipe(cg) =>
                  CoGroupedPipe(FilterKeys(cg, fn))
                case kvPipe =>
                  TypedPipe.FilterKeys(kvPipe, fn)
              }
          )
        }
        go(fk)
      case mg @ MapGroup(_, _) =>
        def go[V1, V2 <: V](mg: MapGroup[K, V1, V2]): LiteralPipe[(K, V)] = {
          val fn = mg.fn
          Unary[TypedPipe, (K, V1), (K, V)](
            handleCoGrouped(mg.on, recurse),
            (tp: TypedPipe[(K, V1)]) =>
              tp match {
                case ReduceStepPipe(rs) =>
                  ReduceStepPipe(ReduceStep.mapGroup(rs)(fn))
                case CoGroupedPipe(cg) =>
                  CoGroupedPipe(MapGroup(cg, fn))
                case kvPipe =>
                  ReduceStepPipe(
                    IdentityReduce[K, V1, V1](cg.keyOrdering, kvPipe, None, Nil, implicitly)
                      .mapGroup(fn)
                  )
              }
          )
        }
        go(mg)
      case step @ IdentityReduce(_, _, _, _, _) =>
        widen(handleReduceStep(step, recurse))
      case step @ UnsortedIdentityReduce(_, _, _, _, _) =>
        widen(handleReduceStep(step, recurse))
      case step @ IteratorMappedReduce(_, _, _, _, _) =>
        widen(handleReduceStep(step, recurse))
    }
  }