def sortMergeTransform[K1: Coder, K2: Coder, A: Coder, B: Coder]()

in scio-smb/src/main/scala/com/spotify/scio/smb/syntax/SortMergeBucketScioContextSyntax.scala [645:718]


  def sortMergeTransform[K1: Coder, K2: Coder, A: Coder, B: Coder](
    keyClass: Class[K1],
    keyClassSecondary: Class[K2],
    a: SortedBucketIO.Read[A],
    b: SortedBucketIO.Read[B]
  ): SortMergeTransform.ReadBuilder[KV[K1, K2], K1, K2, (Iterable[A], Iterable[B])] =
    sortMergeTransform(keyClass, keyClassSecondary, a, b)

  /**
   * Perform a 3-way [[SortedBucketScioContext.sortMergeCoGroup]] operation, then immediately apply
   * a transformation function to the merged cogroups and re-write using the same bucketing key and
   * hashing scheme. By applying the write, transform, and write in the same transform, an extra
   * shuffle step can be avoided.
   *
   * @group cogroup
   */
  @experimental
  def sortMergeTransform[K: Coder, A: Coder, B: Coder, C: Coder](
    keyClass: Class[K],
    a: SortedBucketIO.Read[A],
    b: SortedBucketIO.Read[B],
    c: SortedBucketIO.Read[C],
    targetParallelism: TargetParallelism
  ): SortMergeTransform.ReadBuilder[K, K, Void, (Iterable[A], Iterable[B], Iterable[C])] =
    SMBMultiJoin(self).sortMergeTransform(keyClass, a, b, c, targetParallelism)

  /** `targetParallelism` defaults to `TargetParallelism.auto()` */
  @experimental
  def sortMergeTransform[K: Coder, A: Coder, B: Coder, C: Coder](
    keyClass: Class[K],
    a: SortedBucketIO.Read[A],
    b: SortedBucketIO.Read[B],
    c: SortedBucketIO.Read[C]
  ): SortMergeTransform.ReadBuilder[K, K, Void, (Iterable[A], Iterable[B], Iterable[C])] =
    SMBMultiJoin(self).sortMergeTransform(keyClass, a, b, c)

  /** Secondary keyed variant */
  @experimental
  def sortMergeTransform[K1: Coder, K2: Coder, A: Coder, B: Coder, C: Coder](
    keyClass: Class[K1],
    keyClassSecondary: Class[K2],
    a: SortedBucketIO.Read[A],
    b: SortedBucketIO.Read[B],
    c: SortedBucketIO.Read[C],
    targetParallelism: TargetParallelism
  ): SortMergeTransform.ReadBuilder[KV[K1, K2], K1, K2, (Iterable[A], Iterable[B], Iterable[C])] =
    self.requireNotClosed {
      val tupleTagA = a.getTupleTag
      val tupleTagB = b.getTupleTag
      val tupleTagC = c.getTupleTag
      val fromResult = { (result: CoGbkResult) =>
        (
          result.getAll(tupleTagA).asScala,
          result.getAll(tupleTagB).asScala,
          result.getAll(tupleTagC).asScala
        )
      }
      if (self.isTest) {
        val result = SMBMultiJoin(self).testCoGroup[(K1, K2)](a, b)
        val keyed = result.map { kv =>
          val (k1, k2) = kv.getKey
          KV.of(k1, k2) -> fromResult(kv.getValue)
        }
        new SortMergeTransform.ReadBuilderTest(self, keyed)
      } else {
        val transform = SortedBucketIO
          .read(keyClass, keyClassSecondary)
          .of(a)
          .and(b)
          .and(c)
          .withTargetParallelism(targetParallelism)
        new SortMergeTransform.ReadBuilderImpl(self, transform, fromResult)
      }
    }