private def cogroupImpl[KEY, A, B, A1: Coder, B1: Coder]()

in scio-core/src/main/scala/com/spotify/scio/util/ArtisanJoin.scala [37:94]


  private def cogroupImpl[KEY, A, B, A1: Coder, B1: Coder](
    name: String,
    a: SCollection[(KEY, A)],
    b: SCollection[(KEY, B)]
  )(
    fn: (
      KEY,
      JIterable[A],
      JIterable[B],
      OutputReceiver[(KEY, (A1, B1))]
    ) => Unit
  ): SCollection[(KEY, (A1, B1))] = {
    if (a.state.postGbkOp || b.state.postGbkOp) {
      val msg =
        """
          |Chained grouping/join detected. Use a combined operation instead to reduce shuffle.
          |
          |For example:
          |a.cogroup(B).cogroup(c) => MultiJoin.cogroup(a, b, c)
          |a.join(b).join(c) => MultiJoin(a, b, c)
          |a.leftOuterJoin(b).leftOuterJoin(c) => MultiJoin.left(a, b, c)
          |a.groupByKey.join(b) => a.join(b)
        """.stripMargin
      a.context.optionsAs[ScioOptions].getChainedCogroups match {
        case CheckEnabled.OFF =>
        case CheckEnabled.WARNING =>
          log.warn(msg)
        case CheckEnabled.ERROR =>
          throw new RuntimeException(msg)
      }
    }
    val (tagA, tagB) = (new TupleTag[A](), new TupleTag[B]())
    val keyed = KeyedPCollectionTuple
      .of(tagA, a.toKV.internal)
      .and(tagB, b.toKV.internal)
      .apply(s"CoGroupByKey@$name", CoGroupByKey.create())

    implicit val kCoder: Coder[KEY] = a.keyCoder

    type DF = DoFn[KV[KEY, CoGbkResult], (KEY, (A1, B1))]
    a.context
      .wrap(keyed)
      .withName(name)
      .applyTransform(ParDo.of(new DF {
        @ProcessElement
        private[util] def processElement(
          @Element element: KV[KEY, CoGbkResult],
          out: OutputReceiver[(KEY, (A1, B1))]
        ): Unit = {
          val key = element.getKey
          val result = element.getValue
          val as = result.getAll(tagA)
          val bs = result.getAll(tagB)
          fn(key, as, bs, out)
        }
      }))
      .withState(_.copy(postGbkOp = true))
  }