in scio-core/src/main/scala/com/spotify/scio/util/ArtisanJoin.scala [37:94]
private def cogroupImpl[KEY, A, B, A1: Coder, B1: Coder](
name: String,
a: SCollection[(KEY, A)],
b: SCollection[(KEY, B)]
)(
fn: (
KEY,
JIterable[A],
JIterable[B],
OutputReceiver[(KEY, (A1, B1))]
) => Unit
): SCollection[(KEY, (A1, B1))] = {
if (a.state.postGbkOp || b.state.postGbkOp) {
val msg =
"""
|Chained grouping/join detected. Use a combined operation instead to reduce shuffle.
|
|For example:
|a.cogroup(B).cogroup(c) => MultiJoin.cogroup(a, b, c)
|a.join(b).join(c) => MultiJoin(a, b, c)
|a.leftOuterJoin(b).leftOuterJoin(c) => MultiJoin.left(a, b, c)
|a.groupByKey.join(b) => a.join(b)
""".stripMargin
a.context.optionsAs[ScioOptions].getChainedCogroups match {
case CheckEnabled.OFF =>
case CheckEnabled.WARNING =>
log.warn(msg)
case CheckEnabled.ERROR =>
throw new RuntimeException(msg)
}
}
val (tagA, tagB) = (new TupleTag[A](), new TupleTag[B]())
val keyed = KeyedPCollectionTuple
.of(tagA, a.toKV.internal)
.and(tagB, b.toKV.internal)
.apply(s"CoGroupByKey@$name", CoGroupByKey.create())
implicit val kCoder: Coder[KEY] = a.keyCoder
type DF = DoFn[KV[KEY, CoGbkResult], (KEY, (A1, B1))]
a.context
.wrap(keyed)
.withName(name)
.applyTransform(ParDo.of(new DF {
@ProcessElement
private[util] def processElement(
@Element element: KV[KEY, CoGbkResult],
out: OutputReceiver[(KEY, (A1, B1))]
): Unit = {
val key = element.getKey
val result = element.getValue
val as = result.getAll(tagA)
val bs = result.getAll(tagB)
fn(key, as, bs, out)
}
}))
.withState(_.copy(postGbkOp = true))
}