in scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala [1345:1609]
def timestampBy(f: T => Instant, allowedTimestampSkew: Duration = Duration.ZERO): SCollection[T] =
this.applyTransform(
WithTimestamps
.of(Functions.serializableFn(f))
.withAllowedTimestampSkew(allowedTimestampSkew): @nowarn("cat=deprecation")
)
// =======================================================================
// Read operations
// =======================================================================
/** @deprecated Use readTextFiles */
@deprecated("Use readTextFiles", "0.14.5")
def readFiles(implicit ev: T <:< String): SCollection[String] =
readFiles(beam.TextIO.readFiles())
/**
* Reads each file, represented as a pattern, in this [[SCollection]].
*
* @return
* each line of the input files.
*/
def readTextFiles(implicit ev: T <:< String): SCollection[String] =
new FileSCollectionFunctions(this.covary_).readTextFiles()
/**
* Reads each file, represented as a pattern, in this [[SCollection]].
*
* @return
* each file fully read as [[Array[Byte]].
*/
def readFilesAsBytes(implicit ev: T <:< String): SCollection[Array[Byte]] =
new FileSCollectionFunctions(this.covary_).readFilesAsBytes()
/**
* Reads each file, represented as a pattern, in this [[SCollection]].
*
* @return
* each file fully read as [[String]].
*/
def readFilesAsString(implicit ev: T <:< String): SCollection[String] =
new FileSCollectionFunctions(this.covary_).readFilesAsString()
/**
* Reads each file, represented as a pattern, in this [[SCollection]].
*
* @see
* [[readFilesAsBytes]], [[readFilesAsString]]
*/
def readFiles[A: Coder](
f: beam.FileIO.ReadableFile => A
)(implicit ev: T <:< String): SCollection[A] =
new FileSCollectionFunctions(this.covary_).readFiles(f)
/**
* Reads each file, represented as a pattern, in this [[SCollection]].
*
* @see
* [[readFilesAsBytes]], [[readFilesAsString]]
*
* @param directoryTreatment
* Controls how to handle directories in the input.
* @param compression
* Reads files using the given [[org.apache.beam.sdk.io.Compression]].
*/
def readFiles[A: Coder](directoryTreatment: DirectoryTreatment, compression: Compression)(
f: beam.FileIO.ReadableFile => A
)(implicit ev: T <:< String): SCollection[A] =
new FileSCollectionFunctions(this.covary_).readFiles(directoryTreatment, compression)(f)
/**
* Reads each file, represented as a pattern, in this [[SCollection]]. Files are split into
* multiple offset ranges and read with the [[FileBasedSource]].
*
* @param desiredBundleSizeBytes
* Desired size of bundles read by the sources.
* @param directoryTreatment
* Controls how to handle directories in the input.
* @param compression
* Reads files using the given [[org.apache.beam.sdk.io.Compression]].
*/
def readFiles[A: Coder](
desiredBundleSizeBytes: Long,
directoryTreatment: DirectoryTreatment,
compression: Compression
)(f: String => FileBasedSource[A])(implicit ev: T <:< String): SCollection[A] =
new FileSCollectionFunctions(this.covary_)
.readFiles(desiredBundleSizeBytes, directoryTreatment, compression)(f)
/**
* Reads each file, represented as a pattern, in this [[SCollection]].
*
* @see
* [[readFilesAsBytes]], [[readFilesAsString]], [[readFiles]]
*
* @param directoryTreatment
* Controls how to handle directories in the input.
* @param compression
* Reads files using the given [[org.apache.beam.sdk.io.Compression]].
*/
def readFiles[A: Coder](
filesTransform: PTransform[_ >: PCollection[beam.FileIO.ReadableFile], PCollection[A]],
directoryTreatment: DirectoryTreatment = DirectoryTreatment.SKIP,
compression: Compression = Compression.AUTO
)(implicit ev: T <:< String): SCollection[A] =
new FileSCollectionFunctions(this.covary_)
.readFiles(filesTransform, directoryTreatment, compression)
/**
* Reads each file, represented as a pattern, in this [[SCollection]]. Files are split into
* multiple offset ranges and read with the [[FileBasedSource]].
*
* @return
* origin file name paired with read line.
*
* @param desiredBundleSizeBytes
* Desired size of bundles read by the sources.
* @param directoryTreatment
* Controls how to handle directories in the input.
* @param compression
* Reads files using the given [[org.apache.beam.sdk.io.Compression]].
*/
def readTextFilesWithPath(
desiredBundleSizeBytes: Long = FileSCollectionFunctions.DefaultBundleSizeBytes,
directoryTreatment: DirectoryTreatment = DirectoryTreatment.SKIP,
compression: Compression = Compression.AUTO
)(implicit ev: T <:< String): SCollection[(String, String)] =
new FileSCollectionFunctions(this.covary_)
.readTextFilesWithPath(desiredBundleSizeBytes, directoryTreatment, compression)
/**
* Reads each file, represented as a pattern, in this [[SCollection]]. Files are split into
* multiple offset ranges and read with the [[FileBasedSource]].
*
* @return
* origin file name paired with read element.
*
* @param desiredBundleSizeBytes
* Desired size of bundles read by the sources.
* @param directoryTreatment
* Controls how to handle directories in the input.
* @param compression
* Reads files using the given [[org.apache.beam.sdk.io.Compression]].
*/
def readFilesWithPath[A: Coder](
desiredBundleSizeBytes: Long = FileSCollectionFunctions.DefaultBundleSizeBytes,
directoryTreatment: DirectoryTreatment = DirectoryTreatment.SKIP,
compression: Compression = Compression.AUTO
)(
f: String => FileBasedSource[A]
)(implicit ev: T <:< String): SCollection[(String, A)] =
new FileSCollectionFunctions(this.covary_)
.readFilesWithPath(desiredBundleSizeBytes, directoryTreatment, compression)(f)
/**
* Pairs each element with the value of the provided [[SideInput]] in the element's window.
*
* Reify as List:
* {{{
* val other: SCollection[Int] = sc.parallelize(Seq(1))
* val coll: SCollection[(Int, Seq[Int])] =
* sc.parallelize(Seq(1, 2))
* .reifySideInputAsValues(other.asListSideInput)
* }}}
*
* Reify as Iterable:
* {{{
* val other: SCollection[Int] = sc.parallelize(Seq(1))
* val coll: SCollection[(Int, Iterable[Int])] =
* sc.parallelize(Seq(1, 2))
* .reifySideInputAsValues(other.asIterableSideInput)
* }}}
*
* Reify as Map:
* {{{
* val other: SCollection[(Int, Int)] = sc.parallelize(Seq((1, 1)))
* val coll: SCollection[(Int, Map[Int, Int])] =
* sc.parallelize(Seq(1, 2))
* .reifySideInputAsValues(other.asMapSideInput)
* }}}
*
* Reify as Multimap:
* {{{
* val other: SCollection[(Int, Int)] = sc.parallelize(Seq((1, 1)))
* val coll: SCollection[(Int, Map[Int, Iterable[Int]])] =
* sc.parallelize(Seq(1, 2))
* .reifySideInputAsValues(other.asMultiMapSideInput)
* }}}
*/
// `U: Coder` context bound is required since `PCollectionView` may be of different type
def reifySideInputAsValues[U: Coder](side: SideInput[U]): SCollection[(T, U)] =
this.transform(_.withSideInputs(side).map((t, s) => (t, s(side))).toSCollection)
/** Returns an [[SCollection]] consisting of a single `Seq[T]` element. */
def reifyAsListInGlobalWindow: SCollection[Seq[T]] =
reifyInGlobalWindow(_.asListSideInput)
/** Returns an [[SCollection]] consisting of a single `Iterable[T]` element. */
def reifyAsIterableInGlobalWindow: SCollection[Iterable[T]] =
reifyInGlobalWindow(_.asIterableSideInput)
/**
* Returns an [[SCollection]] consisting of a single element, containing the value of the given
* side input in the global window.
*
* Reify as List:
* {{{
* val coll: SCollection[Seq[Int]] =
* sc.parallelize(Seq(1, 2)).reifyInGlobalWindow(_.asListSideInput)
* }}}
*
* Can be used to replace patterns like:
* {{{
* val coll: SCollection[Iterable[Int]] = sc.parallelize(Seq(1, 2)).groupBy(_ => ())
* }}}
* where you want to actually get an empty [[Iterable]] even if no data is present.
*/
// `U: Coder` context bound is required since `PCollectionView` may be of different type
private[scio] def reifyInGlobalWindow[U: Coder](
view: SCollection[T] => SideInput[U]
): SCollection[U] =
this.transform(coll =>
context.parallelize[Unit](Seq(())).reifySideInputAsValues(view(coll)).values
)
// =======================================================================
// Write operations
// =======================================================================
/**
* Extract data from this SCollection as a closed [[Tap]]. The Tap will be available once the
* pipeline completes successfully. `.materialize()` must be called before the `ScioContext` is
* run, as its implementation modifies the current pipeline graph.
*
* {{{
* val closedTap = sc.parallelize(1 to 10).materialize
* sc.run().waitUntilDone().tap(closedTap)
* }}}
*
* @group output
*/
def materialize: ClosedTap[T] =
materialize(ScioUtil.getTempFile(context), isCheckpoint = false)
private[scio] def materialize(path: String, isCheckpoint: Boolean): ClosedTap[T] =
if (context.isTest) {
// Do not run assertions on materialized value but still access test context to trigger
// the test checking if we're running inside a JobTest
if (!isCheckpoint) TestDataManager.getOutput(context.testId.get)
saveAsInMemoryTap
} else {
val elemCoder = CoderMaterializer.beam(context, coder)
val arrCoder = ByteArrayCoder.of()
this
.map { e =>
CoderUtils.encodeToByteArray(
arrCoder,
CoderUtils.encodeToByteArray(elemCoder, e),
// encode record length
BCoder.Context.NESTED: @nowarn("cat=deprecation")
)
}
.saveAsBinaryFile(path)
ClosedTap(MaterializeTap[T](path, context))
}