def handleCoGrouped[K, V]()

in scalding-base/src/main/scala/com/twitter/scalding/typed/WritePartitioner.scala [147:253]


    def handleCoGrouped[K, V](
        cg: CoGroupable[K, V],
        recurse: FunctionK[TypedPipe, mat.TP]
    ): mat.TP[(K, V)] = {
      import CoGrouped._
      import TypedPipe._

      def pipeToCG[V1](t: TypedPipe[(K, V1)]): CoGroupable[K, V1] =
        t match {
          case ReduceStepPipe(cg: CoGroupable[K @unchecked, V1 @unchecked]) =>
            // we are relying on the fact that we use Ordering[K]
            // as a contravariant type, despite it not being defined
            // that way.
            cg
          case CoGroupedPipe(cg) =>
            // we are relying on the fact that we use Ordering[K]
            // as a contravariant type, despite it not being defined
            // that way.
            cg.asInstanceOf[CoGroupable[K, V1]]
          case kvPipe => IdentityReduce[K, V1, V1](cg.keyOrdering, kvPipe, None, Nil, implicitly)
        }

      cg match {
        case p @ Pair(_, _, _) =>
          def go[A, B, C](pair: Pair[K, A, B, C]): mat.TP[(K, C)] = {
            val mleft = handleCoGrouped(pair.larger, recurse)
            val mright = handleCoGrouped(pair.smaller, recurse)
            val both = mat.zip(mleft, mright)
            mat.map(both) { case (l, r) =>
              CoGroupedPipe(Pair(pipeToCG(l), pipeToCG(r), pair.fn))
            }
          }
          widen(go(p))
        case wr @ WithReducers(_, _) =>
          def go[V1 <: V](wr: WithReducers[K, V1]): mat.TP[(K, V)] = {
            val reds = wr.reds
            mat.map(handleCoGrouped(wr.on, recurse)) { (tp: TypedPipe[(K, V1)]) =>
              tp match {
                case ReduceStepPipe(rs) =>
                  ReduceStepPipe(ReduceStep.withReducers(rs, reds))
                case CoGroupedPipe(cg) =>
                  CoGroupedPipe(WithReducers(cg, reds))
                case kvPipe =>
                  ReduceStepPipe(
                    IdentityReduce[K, V1, V1](cg.keyOrdering, kvPipe, None, Nil, implicitly)
                      .withReducers(reds)
                  )
              }
            }
          }
          go(wr)
        case wd @ WithDescription(_, _) =>
          def go[V1 <: V](wd: WithDescription[K, V1]): mat.TP[(K, V)] = {
            val desc = wd.description
            mat.map(handleCoGrouped(wd.on, recurse)) { (tp: TypedPipe[(K, V1)]) =>
              tp match {
                case ReduceStepPipe(rs) =>
                  ReduceStepPipe(ReduceStep.withDescription(rs, desc))
                case CoGroupedPipe(cg) =>
                  CoGroupedPipe(WithDescription(cg, desc))
                case kvPipe =>
                  kvPipe.withDescription(desc)
              }
            }
          }
          go(wd)
        case fk @ CoGrouped.FilterKeys(_, _) =>
          def go[V1 <: V](fk: CoGrouped.FilterKeys[K, V1]): mat.TP[(K, V)] = {
            val fn = fk.fn
            mat.map(handleCoGrouped(fk.on, recurse)) { (tp: TypedPipe[(K, V1)]) =>
              tp match {
                case ReduceStepPipe(rs) =>
                  val mapped = rs.mapped
                  val mappedF = TypedPipe.FilterKeys(mapped, fn)
                  ReduceStepPipe(ReduceStep.setInput(rs, mappedF))
                case CoGroupedPipe(cg) =>
                  CoGroupedPipe(CoGrouped.FilterKeys(cg, fn))
                case kvPipe =>
                  TypedPipe.FilterKeys(kvPipe, fn)
              }
            }
          }
          go(fk)
        case mg @ MapGroup(_, _) =>
          def go[V1, V2 <: V](mg: MapGroup[K, V1, V2]): mat.TP[(K, V)] = {
            val fn = mg.fn
            mat.map(handleCoGrouped(mg.on, recurse)) { (tp: TypedPipe[(K, V1)]) =>
              tp match {
                case ReduceStepPipe(rs) =>
                  ReduceStepPipe(ReduceStep.mapGroup(rs)(fn))
                case CoGroupedPipe(cg) =>
                  CoGroupedPipe(MapGroup(cg, fn))
                case kvPipe =>
                  val rs = IdentityReduce[K, V1, V1](cg.keyOrdering, kvPipe, None, Nil, implicitly)
                  ReduceStepPipe(ReduceStep.mapGroup(rs)(fn))
              }
            }
          }
          go(mg)
        case step @ IdentityReduce(_, _, _, _, _) =>
          widen(handleReduceStep(step, recurse)) // the widen trick sidesteps GADT bugs
        case step @ UnsortedIdentityReduce(_, _, _, _, _) =>
          widen(handleReduceStep(step, recurse))
        case step @ IteratorMappedReduce(_, _, _, _, _) =>
          widen(handleReduceStep(step, recurse))
      }
    }