def timestampBy()

in scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala [1345:1609]


  def timestampBy(f: T => Instant, allowedTimestampSkew: Duration = Duration.ZERO): SCollection[T] =
    this.applyTransform(
      WithTimestamps
        .of(Functions.serializableFn(f))
        .withAllowedTimestampSkew(allowedTimestampSkew): @nowarn("cat=deprecation")
    )

  // =======================================================================
  // Read operations
  // =======================================================================

  /** @deprecated Use readTextFiles */
  @deprecated("Use readTextFiles", "0.14.5")
  def readFiles(implicit ev: T <:< String): SCollection[String] =
    readFiles(beam.TextIO.readFiles())

  /**
   * Reads each file, represented as a pattern, in this [[SCollection]].
   *
   * @return
   *   each line of the input files.
   */
  def readTextFiles(implicit ev: T <:< String): SCollection[String] =
    new FileSCollectionFunctions(this.covary_).readTextFiles()

  /**
   * Reads each file, represented as a pattern, in this [[SCollection]].
   *
   * @return
   *   each file fully read as [[Array[Byte]].
   */
  def readFilesAsBytes(implicit ev: T <:< String): SCollection[Array[Byte]] =
    new FileSCollectionFunctions(this.covary_).readFilesAsBytes()

  /**
   * Reads each file, represented as a pattern, in this [[SCollection]].
   *
   * @return
   *   each file fully read as [[String]].
   */
  def readFilesAsString(implicit ev: T <:< String): SCollection[String] =
    new FileSCollectionFunctions(this.covary_).readFilesAsString()

  /**
   * Reads each file, represented as a pattern, in this [[SCollection]].
   *
   * @see
   *   [[readFilesAsBytes]], [[readFilesAsString]]
   */
  def readFiles[A: Coder](
    f: beam.FileIO.ReadableFile => A
  )(implicit ev: T <:< String): SCollection[A] =
    new FileSCollectionFunctions(this.covary_).readFiles(f)

  /**
   * Reads each file, represented as a pattern, in this [[SCollection]].
   *
   * @see
   *   [[readFilesAsBytes]], [[readFilesAsString]]
   *
   * @param directoryTreatment
   *   Controls how to handle directories in the input.
   * @param compression
   *   Reads files using the given [[org.apache.beam.sdk.io.Compression]].
   */
  def readFiles[A: Coder](directoryTreatment: DirectoryTreatment, compression: Compression)(
    f: beam.FileIO.ReadableFile => A
  )(implicit ev: T <:< String): SCollection[A] =
    new FileSCollectionFunctions(this.covary_).readFiles(directoryTreatment, compression)(f)

  /**
   * Reads each file, represented as a pattern, in this [[SCollection]]. Files are split into
   * multiple offset ranges and read with the [[FileBasedSource]].
   *
   * @param desiredBundleSizeBytes
   *   Desired size of bundles read by the sources.
   * @param directoryTreatment
   *   Controls how to handle directories in the input.
   * @param compression
   *   Reads files using the given [[org.apache.beam.sdk.io.Compression]].
   */
  def readFiles[A: Coder](
    desiredBundleSizeBytes: Long,
    directoryTreatment: DirectoryTreatment,
    compression: Compression
  )(f: String => FileBasedSource[A])(implicit ev: T <:< String): SCollection[A] =
    new FileSCollectionFunctions(this.covary_)
      .readFiles(desiredBundleSizeBytes, directoryTreatment, compression)(f)

  /**
   * Reads each file, represented as a pattern, in this [[SCollection]].
   *
   * @see
   *   [[readFilesAsBytes]], [[readFilesAsString]], [[readFiles]]
   *
   * @param directoryTreatment
   *   Controls how to handle directories in the input.
   * @param compression
   *   Reads files using the given [[org.apache.beam.sdk.io.Compression]].
   */
  def readFiles[A: Coder](
    filesTransform: PTransform[_ >: PCollection[beam.FileIO.ReadableFile], PCollection[A]],
    directoryTreatment: DirectoryTreatment = DirectoryTreatment.SKIP,
    compression: Compression = Compression.AUTO
  )(implicit ev: T <:< String): SCollection[A] =
    new FileSCollectionFunctions(this.covary_)
      .readFiles(filesTransform, directoryTreatment, compression)

  /**
   * Reads each file, represented as a pattern, in this [[SCollection]]. Files are split into
   * multiple offset ranges and read with the [[FileBasedSource]].
   *
   * @return
   *   origin file name paired with read line.
   *
   * @param desiredBundleSizeBytes
   *   Desired size of bundles read by the sources.
   * @param directoryTreatment
   *   Controls how to handle directories in the input.
   * @param compression
   *   Reads files using the given [[org.apache.beam.sdk.io.Compression]].
   */
  def readTextFilesWithPath(
    desiredBundleSizeBytes: Long = FileSCollectionFunctions.DefaultBundleSizeBytes,
    directoryTreatment: DirectoryTreatment = DirectoryTreatment.SKIP,
    compression: Compression = Compression.AUTO
  )(implicit ev: T <:< String): SCollection[(String, String)] =
    new FileSCollectionFunctions(this.covary_)
      .readTextFilesWithPath(desiredBundleSizeBytes, directoryTreatment, compression)

  /**
   * Reads each file, represented as a pattern, in this [[SCollection]]. Files are split into
   * multiple offset ranges and read with the [[FileBasedSource]].
   *
   * @return
   *   origin file name paired with read element.
   *
   * @param desiredBundleSizeBytes
   *   Desired size of bundles read by the sources.
   * @param directoryTreatment
   *   Controls how to handle directories in the input.
   * @param compression
   *   Reads files using the given [[org.apache.beam.sdk.io.Compression]].
   */
  def readFilesWithPath[A: Coder](
    desiredBundleSizeBytes: Long = FileSCollectionFunctions.DefaultBundleSizeBytes,
    directoryTreatment: DirectoryTreatment = DirectoryTreatment.SKIP,
    compression: Compression = Compression.AUTO
  )(
    f: String => FileBasedSource[A]
  )(implicit ev: T <:< String): SCollection[(String, A)] =
    new FileSCollectionFunctions(this.covary_)
      .readFilesWithPath(desiredBundleSizeBytes, directoryTreatment, compression)(f)

  /**
   * Pairs each element with the value of the provided [[SideInput]] in the element's window.
   *
   * Reify as List:
   * {{{
   *   val other: SCollection[Int] = sc.parallelize(Seq(1))
   *   val coll: SCollection[(Int, Seq[Int])] =
   *     sc.parallelize(Seq(1, 2))
   *       .reifySideInputAsValues(other.asListSideInput)
   * }}}
   *
   * Reify as Iterable:
   * {{{
   *   val other: SCollection[Int] = sc.parallelize(Seq(1))
   *   val coll: SCollection[(Int, Iterable[Int])] =
   *     sc.parallelize(Seq(1, 2))
   *       .reifySideInputAsValues(other.asIterableSideInput)
   * }}}
   *
   * Reify as Map:
   * {{{
   *   val other: SCollection[(Int, Int)] = sc.parallelize(Seq((1, 1)))
   *   val coll: SCollection[(Int, Map[Int, Int])] =
   *     sc.parallelize(Seq(1, 2))
   *       .reifySideInputAsValues(other.asMapSideInput)
   * }}}
   *
   * Reify as Multimap:
   * {{{
   *   val other: SCollection[(Int, Int)]  = sc.parallelize(Seq((1, 1)))
   *   val coll: SCollection[(Int, Map[Int, Iterable[Int]])]  =
   *     sc.parallelize(Seq(1, 2))
   *       .reifySideInputAsValues(other.asMultiMapSideInput)
   * }}}
   */
  // `U: Coder` context bound is required since `PCollectionView` may be of different type
  def reifySideInputAsValues[U: Coder](side: SideInput[U]): SCollection[(T, U)] =
    this.transform(_.withSideInputs(side).map((t, s) => (t, s(side))).toSCollection)

  /** Returns an [[SCollection]] consisting of a single `Seq[T]` element. */
  def reifyAsListInGlobalWindow: SCollection[Seq[T]] =
    reifyInGlobalWindow(_.asListSideInput)

  /** Returns an [[SCollection]] consisting of a single `Iterable[T]` element. */
  def reifyAsIterableInGlobalWindow: SCollection[Iterable[T]] =
    reifyInGlobalWindow(_.asIterableSideInput)

  /**
   * Returns an [[SCollection]] consisting of a single element, containing the value of the given
   * side input in the global window.
   *
   * Reify as List:
   * {{{
   *   val coll: SCollection[Seq[Int]] =
   *     sc.parallelize(Seq(1, 2)).reifyInGlobalWindow(_.asListSideInput)
   * }}}
   *
   * Can be used to replace patterns like:
   * {{{
   *   val coll: SCollection[Iterable[Int]] = sc.parallelize(Seq(1, 2)).groupBy(_ => ())
   * }}}
   * where you want to actually get an empty [[Iterable]] even if no data is present.
   */
  // `U: Coder` context bound is required since `PCollectionView` may be of different type
  private[scio] def reifyInGlobalWindow[U: Coder](
    view: SCollection[T] => SideInput[U]
  ): SCollection[U] =
    this.transform(coll =>
      context.parallelize[Unit](Seq(())).reifySideInputAsValues(view(coll)).values
    )

  // =======================================================================
  // Write operations
  // =======================================================================

  /**
   * Extract data from this SCollection as a closed [[Tap]]. The Tap will be available once the
   * pipeline completes successfully. `.materialize()` must be called before the `ScioContext` is
   * run, as its implementation modifies the current pipeline graph.
   *
   * {{{
   * val closedTap = sc.parallelize(1 to 10).materialize
   * sc.run().waitUntilDone().tap(closedTap)
   * }}}
   *
   * @group output
   */
  def materialize: ClosedTap[T] =
    materialize(ScioUtil.getTempFile(context), isCheckpoint = false)

  private[scio] def materialize(path: String, isCheckpoint: Boolean): ClosedTap[T] =
    if (context.isTest) {
      // Do not run assertions on materialized value but still access test context to trigger
      // the test checking if we're running inside a JobTest
      if (!isCheckpoint) TestDataManager.getOutput(context.testId.get)
      saveAsInMemoryTap
    } else {
      val elemCoder = CoderMaterializer.beam(context, coder)
      val arrCoder = ByteArrayCoder.of()
      this
        .map { e =>
          CoderUtils.encodeToByteArray(
            arrCoder,
            CoderUtils.encodeToByteArray(elemCoder, e),
            // encode record length
            BCoder.Context.NESTED: @nowarn("cat=deprecation")
          )
        }
        .saveAsBinaryFile(path)
      ClosedTap(MaterializeTap[T](path, context))
    }