in scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala [84:265]
def applyPerKeyDoFn[U: Coder](t: DoFn[KV[K, V], KV[K, U]]): SCollection[(K, U)] =
this.applyPerKey(ParDo.of(t))(kvToTuple)
/**
* Convert this SCollection to an [[SCollectionWithHotKeyFanout]] that uses an intermediate node
* to combine "hot" keys partially before performing the full combine.
* @param hotKeyFanout
* a function from keys to an integer N, where the key will be spread among N intermediate nodes
* for partial combining. If N is less than or equal to 1, this key will not be sent through an
* intermediate node.
*/
def withHotKeyFanout(hotKeyFanout: K => Int): SCollectionWithHotKeyFanout[K, V] =
new SCollectionWithHotKeyFanout(this, Left(hotKeyFanout))
/**
* Convert this SCollection to an [[SCollectionWithHotKeyFanout]] that uses an intermediate node
* to combine "hot" keys partially before performing the full combine.
* @param hotKeyFanout
* constant value for every key
*/
def withHotKeyFanout(hotKeyFanout: Int): SCollectionWithHotKeyFanout[K, V] =
new SCollectionWithHotKeyFanout(this, Right(hotKeyFanout))
// =======================================================================
// CoGroups
// =======================================================================
/**
* For each key k in `this` or `rhs`, return a resulting SCollection that contains a tuple with
* the list of values for that key in `this` as well as `rhs`.
* @group cogroup
*/
def cogroup[W](rhs: SCollection[(K, W)]): SCollection[(K, (Iterable[V], Iterable[W]))] =
ArtisanJoin.cogroup(self.tfName, self, rhs)
/**
* For each key k in `this` or `rhs1` or `rhs2`, return a resulting SCollection that contains a
* tuple with the list of values for that key in `this`, `rhs1` and `rhs2`.
* @group cogroup
*/
def cogroup[W1, W2](
rhs1: SCollection[(K, W1)],
rhs2: SCollection[(K, W2)]
): SCollection[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] =
MultiJoin.withName(self.tfName).cogroup(self, rhs1, rhs2)
/**
* For each key k in `this` or `rhs1` or `rhs2` or `rhs3`, return a resulting SCollection that
* contains a tuple with the list of values for that key in `this`, `rhs1`, `rhs2` and `rhs3`.
* @group cogroup
*/
def cogroup[W1, W2, W3](
rhs1: SCollection[(K, W1)],
rhs2: SCollection[(K, W2)],
rhs3: SCollection[(K, W3)]
): SCollection[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] =
MultiJoin.withName(self.tfName).cogroup(self, rhs1, rhs2, rhs3)
/**
* Alias for `cogroup`.
* @group cogroup
*/
def groupWith[W](rhs: SCollection[(K, W)]): SCollection[(K, (Iterable[V], Iterable[W]))] =
this.cogroup(rhs)
/**
* Alias for `cogroup`.
* @group cogroup
*/
def groupWith[W1, W2](
rhs1: SCollection[(K, W1)],
rhs2: SCollection[(K, W2)]
): SCollection[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] =
this.cogroup(rhs1, rhs2)
/**
* Alias for `cogroup`.
* @group cogroup
*/
def groupWith[W1, W2, W3](
rhs1: SCollection[(K, W1)],
rhs2: SCollection[(K, W2)],
rhs3: SCollection[(K, W3)]
): SCollection[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] =
this.cogroup(rhs1, rhs2, rhs3)
/**
* Partition this SCollection using `K.##` into `n` partitions. Note that K should provide
* consistent hash code accross different JVM.
*
* @param numPartitions
* number of output partitions
* @return
* partitioned SCollections in a `Seq`
* @group collection
*/
def hashPartitionByKey(numPartitions: Int): Seq[SCollection[(K, V)]] =
self.partition(
numPartitions,
elem => Math.floorMod(ScioUtil.consistentHashCode(elem._1), numPartitions)
)
// =======================================================================
// Joins
// =======================================================================
/**
* Perform a full outer join of `this` and `rhs`. For each element (k, v) in `this`, the resulting
* SCollection will either contain all pairs (k, (Some(v), Some(w))) for w in `rhs`, or the pair
* (k, (Some(v), None)) if no elements in `rhs` have key k. Similarly, for each element (k, w) in
* `rhs`, the resulting SCollection will either contain all pairs (k, (Some(v), Some(w))) for v in
* `this`, or the pair (k, (None, Some(w))) if no elements in `this` have key k.
* @group join
*/
def fullOuterJoin[W](rhs: SCollection[(K, W)]): SCollection[(K, (Option[V], Option[W]))] =
ArtisanJoin.outer(self.tfName, self, rhs)
/**
* Return an SCollection containing all pairs of elements with matching keys in `this` and `rhs`.
* Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `rhs`.
* @group join
*/
def join[W](rhs: SCollection[(K, W)]): SCollection[(K, (V, W))] =
ArtisanJoin(self.tfName, self, rhs)
/**
* Perform a left outer join of `this` and `rhs`. For each element (k, v) in `this`, the resulting
* SCollection will either contain all pairs (k, (v, Some(w))) for w in `rhs`, or the pair (k, (v,
* None)) if no elements in `rhs` have key k.
* @group join
*/
def leftOuterJoin[W](rhs: SCollection[(K, W)]): SCollection[(K, (V, Option[W]))] =
ArtisanJoin.left(self.tfName, self, rhs)
/**
* Perform a right outer join of `this` and `rhs`. For each element (k, w) in `rhs`, the resulting
* SCollection will either contain all pairs (k, (Some(v), w)) for v in `this`, or the pair (k,
* (None, w)) if no elements in `this` have key k.
* @group join
*/
def rightOuterJoin[W](rhs: SCollection[(K, W)]): SCollection[(K, (Option[V], W))] =
ArtisanJoin.right(self.tfName, self, rhs)
/**
* Full outer join for cases when the left collection (`this`) is much larger than the right
* collection (`rhs`) which cannot fit in memory, but contains a mostly overlapping set of keys as
* the left collection, i.e. when the intersection of keys is sparse in the left collection. A
* Bloom Filter of keys from the right collection (`rhs`) is used to split `this` into 2
* partitions. Only those with keys in the filter go through the join and the rest are
* concatenated. This is useful for joining historical aggregates with incremental updates.
*
* Import `magnolify.guava.auto._` to get common instances of Guava
* [[com.google.common.hash.Funnel Funnel]] s.
*
* Read more about Bloom Filter: [[com.google.common.hash.BloomFilter]].
*
* @group join
* @param rhsNumKeys
* An estimate of the number of keys in the right collection `rhs`. This estimate is used to
* find the size and number of BloomFilters rhs Scio would use to split the left collection
* (`this`) into overlap and intersection in a "map" step before an exact join. Having a value
* close to the actual number improves the false positives in intermediate steps which means
* less shuffle.
* @param fpProb
* A fraction in range (0, 1) which would be the accepted false positive probability when
* computing the overlap. Note: having fpProb = 0 doesn't mean that Scio would calculate an
* exact overlap.
*/
def sparseFullOuterJoin[W](
rhs: SCollection[(K, W)],
rhsNumKeys: Long,
fpProb: Double = 0.01
)(implicit funnel: Funnel[K]): SCollection[(K, (Option[V], Option[W]))] = self.transform { me =>
implicit val wCoder = rhs.valueCoder
SCollection.unionAll(
split(me, rhs, rhsNumKeys, fpProb).map { case (lhsUnique, lhsOverlap, rhs) =>
val unique = lhsUnique.map(kv => (kv._1, (Option(kv._2), Option.empty[W])))
unique ++ lhsOverlap.fullOuterJoin(rhs)
}
)
}