in scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/package.scala [274:377]
def asSparkey(implicit w: SparkeyWritable[K, V]): SCollection[SparkeyUri] = this.asSparkey()
/**
* Convert this SCollection to a SideInput, mapping key-value pairs of each window to a
* `SparkeyReader`, to be used with
* [[com.spotify.scio.values.SCollection.withSideInputs SCollection.withSideInputs]]. It is
* required that each key of the input be associated with a single value.
*
* @param numShards
* the number of shards to use when writing the Sparkey file(s).
*/
@experimental
def asSparkeySideInput(
numShards: Short = SparkeyIO.DefaultSideInputNumShards,
compressionType: CompressionType = SparkeyIO.DefaultCompressionType,
compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize
)(implicit w: SparkeyWritable[K, V]): SideInput[SparkeyReader] =
self
.asSparkey(
numShards = numShards,
compressionType = compressionType,
compressionBlockSize = compressionBlockSize
)
.asSparkeySideInput
/**
* Convert this SCollection to a SideInput, mapping key-value pairs of each window to a
* `SparkeyReader`, to be used with
* [[com.spotify.scio.values.SCollection.withSideInputs SCollection.withSideInputs]]. It is
* required that each key of the input be associated with a single value.
*/
@experimental
def asSparkeySideInput(implicit w: SparkeyWritable[K, V]): SideInput[SparkeyReader] =
self.asSparkeySideInput()
/**
* Convert this SCollection to a SideInput, mapping key-value pairs of each window to a
* `SparkeyReader`, to be used with
* [[com.spotify.scio.values.SCollection.withSideInputs SCollection.withSideInputs]]. It is
* required that each key of the input be associated with a single value. The provided [[Cache]]
* will be used to cache reads from the resulting [[SparkeyReader]].
*/
@experimental
def asTypedSparkeySideInput[T](
cache: Cache[String, T],
numShards: Short = SparkeyIO.DefaultSideInputNumShards,
compressionType: CompressionType = SparkeyIO.DefaultCompressionType,
compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize
)(
decoder: Array[Byte] => T
)(implicit w: SparkeyWritable[K, V]): SideInput[TypedSparkeyReader[T]] =
self
.asSparkey(
numShards = numShards,
compressionType = compressionType,
compressionBlockSize = compressionBlockSize
)
.asTypedSparkeySideInput[T](cache)(decoder)
/**
* Convert this SCollection to a SideInput, mapping key-value pairs of each window to a
* `SparkeyMap`, to be used with
* [[com.spotify.scio.values.SCollection.withSideInputs SCollection.withSideInputs]]. It is
* required that each key of the input be associated with a single value. The resulting
* SideInput must fit on disk on each worker that reads it. This is strongly recommended over a
* regular MapSideInput if the data in the side input exceeds 100MB.
*/
@experimental
def asLargeMapSideInput: SideInput[SparkeyMap[K, V]] = self.asLargeMapSideInput()
/**
* Convert this SCollection to a SideInput, mapping key-value pairs of each window to a
* `SparkeyMap`, to be used with
* [[com.spotify.scio.values.SCollection.withSideInputs SCollection.withSideInputs]]. It is
* required that each key of the input be associated with a single value. The resulting
* SideInput must fit on disk on each worker that reads it. This is strongly recommended over a
* regular MapSideInput if the data in the side input exceeds 100MB.
*/
@experimental
def asLargeMapSideInput(
numShards: Short = SparkeyIO.DefaultSideInputNumShards,
compressionType: CompressionType = SparkeyIO.DefaultCompressionType,
compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize
): SideInput[SparkeyMap[K, V]] = {
val beamKoder = CoderMaterializer.beam(self.context.options, Coder[K])
val beamVoder = CoderMaterializer.beam(self.context.options, Coder[V])
new LargeMapSideInput[K, V](
self
.transform(
_.map { tuple =>
val k = CoderUtils.encodeToByteArray(beamKoder, tuple._1)
val v = CoderUtils.encodeToByteArray(beamVoder, tuple._2)
(k, v)
}
.asSparkey(
numShards = numShards,
compressionType = compressionType,
compressionBlockSize = compressionBlockSize
)
)
.applyInternal(View.asSingleton())
)
}