private def planReduceStep[K, V1, V2]()

in scalding-core/src/main/scala/com/twitter/scalding/typed/cascading_backend/CascadingBackend.scala [813:929]


  private def planReduceStep[K, V1, V2](
      rs: ReduceStep[K, V1, V2],
      rec: FunctionK[TypedPipe, CascadingPipe]
  ): CascadingPipe[(K, V2)] = {

    val mapped = rec(rs.mapped)

    def groupOp(gb: GroupBuilder => GroupBuilder): CascadingPipe[_ <: (K, V2)] =
      groupOpWithValueSort(None)(gb)

    def groupOpWithValueSort(
        valueSort: Option[Ordering[V1]]
    )(gb: GroupBuilder => GroupBuilder): CascadingPipe[_ <: (K, V2)] = {
      val flowDef = new FlowDef
      val pipe = maybeBox[K, V1](rs.keyOrdering, flowDef) { (tupleSetter, fields) =>
        val (sortOpt, ts) = valueSort
          .map {
            case ordser: OrderedSerialization[V1 @unchecked] =>
              // We get in here when we do a secondary sort
              // and that sort is an ordered serialization
              // We now need a boxed serializer for this type
              // Then we set the comparator on the field, and finally we box the value with our tupleSetter
              val (boxfn, boxordSer) = getBoxFnAndOrder[V1](ordser, flowDef)
              val valueF = new Fields("value")
              valueF.setComparator("value", new CascadingBinaryComparator(boxordSer))
              val ts2 = tupleSetter.asInstanceOf[TupleSetter[(K, Boxed[V1])]].contraMap { kv1: (K, V1) =>
                (kv1._1, boxfn(kv1._2))
              }
              (Some(valueF), ts2)
            case vs =>
              val vord = Field.singleOrdered("value")(vs)
              (Some(vord), tupleSetter)
          }
          .getOrElse((None, tupleSetter))

        val p = mapped.toPipe(kvFields, flowDef, TupleSetter.asSubSetter(ts))

        RichPipe(p).groupBy(fields) { inGb =>
          val withSort = sortOpt.fold(inGb)(inGb.sortBy)
          gb(withSort)
        }
      }

      val tupConv = tuple2Conv[K, V2](rs.keyOrdering)
      CascadingPipe(pipe, kvFields, flowDef, tupConv)
    }

    rs match {
      case ir @ IdentityReduce(_, _, None, descriptions, _) =>
        type CP[V] = CascadingPipe[_ <: (K, V)]
        // Not doing anything
        ir.evidence.subst[CP](mapped.copy(pipe = RichPipe.setPipeDescriptions(mapped.pipe, descriptions)))
      case uir @ UnsortedIdentityReduce(_, _, None, descriptions, _) =>
        type CP[V] = CascadingPipe[_ <: (K, V)]
        // Not doing anything
        uir.evidence.subst[CP](mapped.copy(pipe = RichPipe.setPipeDescriptions(mapped.pipe, descriptions)))
      case IdentityReduce(_, _, Some(reds), descriptions, _) =>
        groupOp(_.reducers(reds).setDescriptions(descriptions))
      case UnsortedIdentityReduce(_, _, Some(reds), descriptions, _) =>
        // This is weird, but it is sometimes used to force a partition
        groupOp(_.reducers(reds).setDescriptions(descriptions))
      case ivsr @ IdentityValueSortedReduce(_, _, _, _, _, _) =>
        groupOpWithValueSort(Some(ivsr.valueSort)) { gb =>
          // If its an ordered serialization we need to unbox
          val mappedGB =
            if (ivsr.valueSort.isInstanceOf[OrderedSerialization[_]])
              gb.mapStream[Boxed[V1], V1](valueField -> valueField) { it: Iterator[Boxed[V1]] =>
                it.map(_.get)
              }
            else
              gb

          mappedGB
            .reducers(ivsr.reducers.getOrElse(-1))
            .setDescriptions(ivsr.descriptions)
        }
      case vsr @ ValueSortedReduce(_, _, _, _, _, _) =>
        val optVOrdering = Some(vsr.valueSort)
        groupOpWithValueSort(optVOrdering) {
          // If its an ordered serialization we need to unbox
          // the value before handing it to the users operation
          _.every(
            new cascading.pipe.Every(
              _,
              valueField,
              new TypedBufferOp[K, V1, V2](
                keyConverter(vsr.keyOrdering),
                valueConverter(optVOrdering),
                vsr.reduceFn,
                valueField
              ),
              Fields.REPLACE
            )
          )
            .reducers(vsr.reducers.getOrElse(-1))
            .setDescriptions(vsr.descriptions)
        }
      case imr @ IteratorMappedReduce(_, _, _, _, _) =>
        groupOp {
          _.every(
            new cascading.pipe.Every(
              _,
              valueField,
              new TypedBufferOp(
                keyConverter(imr.keyOrdering),
                TupleConverter.singleConverter[V1],
                imr.reduceFn,
                valueField
              ),
              Fields.REPLACE
            )
          )
            .reducers(imr.reducers.getOrElse(-1))
            .setDescriptions(imr.descriptions)
        }
    }
  }