def applyPerKeyDoFn[U: Coder]()

in scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala [84:265]


  def applyPerKeyDoFn[U: Coder](t: DoFn[KV[K, V], KV[K, U]]): SCollection[(K, U)] =
    this.applyPerKey(ParDo.of(t))(kvToTuple)

  /**
   * Convert this SCollection to an [[SCollectionWithHotKeyFanout]] that uses an intermediate node
   * to combine "hot" keys partially before performing the full combine.
   * @param hotKeyFanout
   *   a function from keys to an integer N, where the key will be spread among N intermediate nodes
   *   for partial combining. If N is less than or equal to 1, this key will not be sent through an
   *   intermediate node.
   */
  def withHotKeyFanout(hotKeyFanout: K => Int): SCollectionWithHotKeyFanout[K, V] =
    new SCollectionWithHotKeyFanout(this, Left(hotKeyFanout))

  /**
   * Convert this SCollection to an [[SCollectionWithHotKeyFanout]] that uses an intermediate node
   * to combine "hot" keys partially before performing the full combine.
   * @param hotKeyFanout
   *   constant value for every key
   */
  def withHotKeyFanout(hotKeyFanout: Int): SCollectionWithHotKeyFanout[K, V] =
    new SCollectionWithHotKeyFanout(this, Right(hotKeyFanout))

  // =======================================================================
  // CoGroups
  // =======================================================================

  /**
   * For each key k in `this` or `rhs`, return a resulting SCollection that contains a tuple with
   * the list of values for that key in `this` as well as `rhs`.
   * @group cogroup
   */
  def cogroup[W](rhs: SCollection[(K, W)]): SCollection[(K, (Iterable[V], Iterable[W]))] =
    ArtisanJoin.cogroup(self.tfName, self, rhs)

  /**
   * For each key k in `this` or `rhs1` or `rhs2`, return a resulting SCollection that contains a
   * tuple with the list of values for that key in `this`, `rhs1` and `rhs2`.
   * @group cogroup
   */
  def cogroup[W1, W2](
    rhs1: SCollection[(K, W1)],
    rhs2: SCollection[(K, W2)]
  ): SCollection[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] =
    MultiJoin.withName(self.tfName).cogroup(self, rhs1, rhs2)

  /**
   * For each key k in `this` or `rhs1` or `rhs2` or `rhs3`, return a resulting SCollection that
   * contains a tuple with the list of values for that key in `this`, `rhs1`, `rhs2` and `rhs3`.
   * @group cogroup
   */
  def cogroup[W1, W2, W3](
    rhs1: SCollection[(K, W1)],
    rhs2: SCollection[(K, W2)],
    rhs3: SCollection[(K, W3)]
  ): SCollection[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] =
    MultiJoin.withName(self.tfName).cogroup(self, rhs1, rhs2, rhs3)

  /**
   * Alias for `cogroup`.
   * @group cogroup
   */
  def groupWith[W](rhs: SCollection[(K, W)]): SCollection[(K, (Iterable[V], Iterable[W]))] =
    this.cogroup(rhs)

  /**
   * Alias for `cogroup`.
   * @group cogroup
   */
  def groupWith[W1, W2](
    rhs1: SCollection[(K, W1)],
    rhs2: SCollection[(K, W2)]
  ): SCollection[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] =
    this.cogroup(rhs1, rhs2)

  /**
   * Alias for `cogroup`.
   * @group cogroup
   */
  def groupWith[W1, W2, W3](
    rhs1: SCollection[(K, W1)],
    rhs2: SCollection[(K, W2)],
    rhs3: SCollection[(K, W3)]
  ): SCollection[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] =
    this.cogroup(rhs1, rhs2, rhs3)

  /**
   * Partition this SCollection using `K.##` into `n` partitions. Note that K should provide
   * consistent hash code accross different JVM.
   *
   * @param numPartitions
   *   number of output partitions
   * @return
   *   partitioned SCollections in a `Seq`
   * @group collection
   */
  def hashPartitionByKey(numPartitions: Int): Seq[SCollection[(K, V)]] =
    self.partition(
      numPartitions,
      elem => Math.floorMod(ScioUtil.consistentHashCode(elem._1), numPartitions)
    )

  // =======================================================================
  // Joins
  // =======================================================================

  /**
   * Perform a full outer join of `this` and `rhs`. For each element (k, v) in `this`, the resulting
   * SCollection will either contain all pairs (k, (Some(v), Some(w))) for w in `rhs`, or the pair
   * (k, (Some(v), None)) if no elements in `rhs` have key k. Similarly, for each element (k, w) in
   * `rhs`, the resulting SCollection will either contain all pairs (k, (Some(v), Some(w))) for v in
   * `this`, or the pair (k, (None, Some(w))) if no elements in `this` have key k.
   * @group join
   */
  def fullOuterJoin[W](rhs: SCollection[(K, W)]): SCollection[(K, (Option[V], Option[W]))] =
    ArtisanJoin.outer(self.tfName, self, rhs)

  /**
   * Return an SCollection containing all pairs of elements with matching keys in `this` and `rhs`.
   * Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
   * (k, v2) is in `rhs`.
   * @group join
   */
  def join[W](rhs: SCollection[(K, W)]): SCollection[(K, (V, W))] =
    ArtisanJoin(self.tfName, self, rhs)

  /**
   * Perform a left outer join of `this` and `rhs`. For each element (k, v) in `this`, the resulting
   * SCollection will either contain all pairs (k, (v, Some(w))) for w in `rhs`, or the pair (k, (v,
   * None)) if no elements in `rhs` have key k.
   * @group join
   */
  def leftOuterJoin[W](rhs: SCollection[(K, W)]): SCollection[(K, (V, Option[W]))] =
    ArtisanJoin.left(self.tfName, self, rhs)

  /**
   * Perform a right outer join of `this` and `rhs`. For each element (k, w) in `rhs`, the resulting
   * SCollection will either contain all pairs (k, (Some(v), w)) for v in `this`, or the pair (k,
   * (None, w)) if no elements in `this` have key k.
   * @group join
   */
  def rightOuterJoin[W](rhs: SCollection[(K, W)]): SCollection[(K, (Option[V], W))] =
    ArtisanJoin.right(self.tfName, self, rhs)

  /**
   * Full outer join for cases when the left collection (`this`) is much larger than the right
   * collection (`rhs`) which cannot fit in memory, but contains a mostly overlapping set of keys as
   * the left collection, i.e. when the intersection of keys is sparse in the left collection. A
   * Bloom Filter of keys from the right collection (`rhs`) is used to split `this` into 2
   * partitions. Only those with keys in the filter go through the join and the rest are
   * concatenated. This is useful for joining historical aggregates with incremental updates.
   *
   * Import `magnolify.guava.auto._` to get common instances of Guava
   * [[com.google.common.hash.Funnel Funnel]] s.
   *
   * Read more about Bloom Filter: [[com.google.common.hash.BloomFilter]].
   *
   * @group join
   * @param rhsNumKeys
   *   An estimate of the number of keys in the right collection `rhs`. This estimate is used to
   *   find the size and number of BloomFilters rhs Scio would use to split the left collection
   *   (`this`) into overlap and intersection in a "map" step before an exact join. Having a value
   *   close to the actual number improves the false positives in intermediate steps which means
   *   less shuffle.
   * @param fpProb
   *   A fraction in range (0, 1) which would be the accepted false positive probability when
   *   computing the overlap. Note: having fpProb = 0 doesn't mean that Scio would calculate an
   *   exact overlap.
   */
  def sparseFullOuterJoin[W](
    rhs: SCollection[(K, W)],
    rhsNumKeys: Long,
    fpProb: Double = 0.01
  )(implicit funnel: Funnel[K]): SCollection[(K, (Option[V], Option[W]))] = self.transform { me =>
    implicit val wCoder = rhs.valueCoder
    SCollection.unionAll(
      split(me, rhs, rhsNumKeys, fpProb).map { case (lhsUnique, lhsOverlap, rhs) =>
        val unique = lhsUnique.map(kv => (kv._1, (Option(kv._2), Option.empty[W])))
        unique ++ lhsOverlap.fullOuterJoin(rhs)
      }
    )
  }