def asSparkey()

in scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/package.scala [274:377]


    def asSparkey(implicit w: SparkeyWritable[K, V]): SCollection[SparkeyUri] = this.asSparkey()

    /**
     * Convert this SCollection to a SideInput, mapping key-value pairs of each window to a
     * `SparkeyReader`, to be used with
     * [[com.spotify.scio.values.SCollection.withSideInputs SCollection.withSideInputs]]. It is
     * required that each key of the input be associated with a single value.
     *
     * @param numShards
     *   the number of shards to use when writing the Sparkey file(s).
     */
    @experimental
    def asSparkeySideInput(
      numShards: Short = SparkeyIO.DefaultSideInputNumShards,
      compressionType: CompressionType = SparkeyIO.DefaultCompressionType,
      compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize
    )(implicit w: SparkeyWritable[K, V]): SideInput[SparkeyReader] =
      self
        .asSparkey(
          numShards = numShards,
          compressionType = compressionType,
          compressionBlockSize = compressionBlockSize
        )
        .asSparkeySideInput

    /**
     * Convert this SCollection to a SideInput, mapping key-value pairs of each window to a
     * `SparkeyReader`, to be used with
     * [[com.spotify.scio.values.SCollection.withSideInputs SCollection.withSideInputs]]. It is
     * required that each key of the input be associated with a single value.
     */
    @experimental
    def asSparkeySideInput(implicit w: SparkeyWritable[K, V]): SideInput[SparkeyReader] =
      self.asSparkeySideInput()

    /**
     * Convert this SCollection to a SideInput, mapping key-value pairs of each window to a
     * `SparkeyReader`, to be used with
     * [[com.spotify.scio.values.SCollection.withSideInputs SCollection.withSideInputs]]. It is
     * required that each key of the input be associated with a single value. The provided [[Cache]]
     * will be used to cache reads from the resulting [[SparkeyReader]].
     */
    @experimental
    def asTypedSparkeySideInput[T](
      cache: Cache[String, T],
      numShards: Short = SparkeyIO.DefaultSideInputNumShards,
      compressionType: CompressionType = SparkeyIO.DefaultCompressionType,
      compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize
    )(
      decoder: Array[Byte] => T
    )(implicit w: SparkeyWritable[K, V]): SideInput[TypedSparkeyReader[T]] =
      self
        .asSparkey(
          numShards = numShards,
          compressionType = compressionType,
          compressionBlockSize = compressionBlockSize
        )
        .asTypedSparkeySideInput[T](cache)(decoder)

    /**
     * Convert this SCollection to a SideInput, mapping key-value pairs of each window to a
     * `SparkeyMap`, to be used with
     * [[com.spotify.scio.values.SCollection.withSideInputs SCollection.withSideInputs]]. It is
     * required that each key of the input be associated with a single value. The resulting
     * SideInput must fit on disk on each worker that reads it. This is strongly recommended over a
     * regular MapSideInput if the data in the side input exceeds 100MB.
     */
    @experimental
    def asLargeMapSideInput: SideInput[SparkeyMap[K, V]] = self.asLargeMapSideInput()

    /**
     * Convert this SCollection to a SideInput, mapping key-value pairs of each window to a
     * `SparkeyMap`, to be used with
     * [[com.spotify.scio.values.SCollection.withSideInputs SCollection.withSideInputs]]. It is
     * required that each key of the input be associated with a single value. The resulting
     * SideInput must fit on disk on each worker that reads it. This is strongly recommended over a
     * regular MapSideInput if the data in the side input exceeds 100MB.
     */
    @experimental
    def asLargeMapSideInput(
      numShards: Short = SparkeyIO.DefaultSideInputNumShards,
      compressionType: CompressionType = SparkeyIO.DefaultCompressionType,
      compressionBlockSize: Int = SparkeyIO.DefaultCompressionBlockSize
    ): SideInput[SparkeyMap[K, V]] = {
      val beamKoder = CoderMaterializer.beam(self.context.options, Coder[K])
      val beamVoder = CoderMaterializer.beam(self.context.options, Coder[V])

      new LargeMapSideInput[K, V](
        self
          .transform(
            _.map { tuple =>
              val k = CoderUtils.encodeToByteArray(beamKoder, tuple._1)
              val v = CoderUtils.encodeToByteArray(beamVoder, tuple._2)
              (k, v)
            }
              .asSparkey(
                numShards = numShards,
                compressionType = compressionType,
                compressionBlockSize = compressionBlockSize
              )
          )
          .applyInternal(View.asSingleton())
      )
    }