in scio-smb/src/main/scala/com/spotify/scio/smb/syntax/SortMergeBucketScioContextSyntax.scala [645:718]
def sortMergeTransform[K1: Coder, K2: Coder, A: Coder, B: Coder](
keyClass: Class[K1],
keyClassSecondary: Class[K2],
a: SortedBucketIO.Read[A],
b: SortedBucketIO.Read[B]
): SortMergeTransform.ReadBuilder[KV[K1, K2], K1, K2, (Iterable[A], Iterable[B])] =
sortMergeTransform(keyClass, keyClassSecondary, a, b)
/**
* Perform a 3-way [[SortedBucketScioContext.sortMergeCoGroup]] operation, then immediately apply
* a transformation function to the merged cogroups and re-write using the same bucketing key and
* hashing scheme. By applying the write, transform, and write in the same transform, an extra
* shuffle step can be avoided.
*
* @group cogroup
*/
@experimental
def sortMergeTransform[K: Coder, A: Coder, B: Coder, C: Coder](
keyClass: Class[K],
a: SortedBucketIO.Read[A],
b: SortedBucketIO.Read[B],
c: SortedBucketIO.Read[C],
targetParallelism: TargetParallelism
): SortMergeTransform.ReadBuilder[K, K, Void, (Iterable[A], Iterable[B], Iterable[C])] =
SMBMultiJoin(self).sortMergeTransform(keyClass, a, b, c, targetParallelism)
/** `targetParallelism` defaults to `TargetParallelism.auto()` */
@experimental
def sortMergeTransform[K: Coder, A: Coder, B: Coder, C: Coder](
keyClass: Class[K],
a: SortedBucketIO.Read[A],
b: SortedBucketIO.Read[B],
c: SortedBucketIO.Read[C]
): SortMergeTransform.ReadBuilder[K, K, Void, (Iterable[A], Iterable[B], Iterable[C])] =
SMBMultiJoin(self).sortMergeTransform(keyClass, a, b, c)
/** Secondary keyed variant */
@experimental
def sortMergeTransform[K1: Coder, K2: Coder, A: Coder, B: Coder, C: Coder](
keyClass: Class[K1],
keyClassSecondary: Class[K2],
a: SortedBucketIO.Read[A],
b: SortedBucketIO.Read[B],
c: SortedBucketIO.Read[C],
targetParallelism: TargetParallelism
): SortMergeTransform.ReadBuilder[KV[K1, K2], K1, K2, (Iterable[A], Iterable[B], Iterable[C])] =
self.requireNotClosed {
val tupleTagA = a.getTupleTag
val tupleTagB = b.getTupleTag
val tupleTagC = c.getTupleTag
val fromResult = { (result: CoGbkResult) =>
(
result.getAll(tupleTagA).asScala,
result.getAll(tupleTagB).asScala,
result.getAll(tupleTagC).asScala
)
}
if (self.isTest) {
val result = SMBMultiJoin(self).testCoGroup[(K1, K2)](a, b)
val keyed = result.map { kv =>
val (k1, k2) = kv.getKey
KV.of(k1, k2) -> fromResult(kv.getValue)
}
new SortMergeTransform.ReadBuilderTest(self, keyed)
} else {
val transform = SortedBucketIO
.read(keyClass, keyClassSecondary)
.of(a)
.and(b)
.and(c)
.withTargetParallelism(targetParallelism)
new SortMergeTransform.ReadBuilderImpl(self, transform, fromResult)
}
}