in scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala [127:198]
def withNumBuckets(numBuckets: Int): Write[K1, K2, T] =
this.copy(numBuckets = numBuckets)
def withNumShards(numShards: Int): Write[K1, K2, T] =
this.copy(numShards = numShards)
def withHashType(hashType: HashType): Write[K1, K2, T] =
this.copy(hashType = hashType)
def to(outputDirectory: String): Write[K1, K2, T] =
this.copy(outputDirectory = FileSystems.matchNewResource(outputDirectory, true))
def withTempDirectory(tempDirectory: String): Write[K1, K2, T] =
this.copy(tempDirectory = FileSystems.matchNewResource(tempDirectory, true))
def withFilenamePrefix(filenamePrefix: String): Write[K1, K2, T] =
this.copy(filenamePrefix = filenamePrefix)
def withSuffix(filenameSuffix: String): Write[K1, K2, T] =
this.copy(filenameSuffix = filenameSuffix)
def withSorterMemoryMb(sorterMemoryMb: Int): Write[K1, K2, T] =
this.copy(sorterMemoryMb = sorterMemoryMb)
def withKeyCacheOfSize(keyCacheSize: Int): Write[K1, K2, T] =
this.copy(keyCacheSize = keyCacheSize)
def withCompression(compression: CompressionCodecName): Write[K1, K2, T] =
this.copy(compression = compression)
def withConfiguration(configuration: Configuration): Write[K1, K2, T] =
this.copy(configuration = configuration)
override def getNumBuckets: Integer = numBuckets
override def getNumShards: Int = numShards
override def getFilenamePrefix: String = filenamePrefix
override def getKeyClassPrimary: Class[K1] = keyClassPrimary
override def getKeyClassSecondary: Class[K2] = keyClassSecondary.orNull
override def getHashType: HashType = hashType
override def getOutputDirectory: ResourceId = outputDirectory
override def getTempDirectory: ResourceId = tempDirectory
override def getFilenameSuffix: String = filenameSuffix
override def getSorterMemoryMb: Int = sorterMemoryMb
override def getFileOperations: FileOperations[T] =
ParquetTypeFileOperations[T](compression, configuration)
override def getBucketMetadata: BucketMetadata[K1, K2, T] =
new ParquetBucketMetadata[K1, K2, T](
numBuckets,
numShards,
keyClassPrimary,
keyFieldPrimary,
getKeyClassSecondary,
keyFieldSecondary.orNull,
hashType,
filenamePrefix,
recordClass
)
override def getKeyCacheSize: Int = keyCacheSize
}
case class TransformOutput[K1: ClassTag, K2: ClassTag, T: ClassTag: Coder: ParquetType](
keyFieldPrimary: String,
keyFieldSecondary: Option[String],
compression: CompressionCodecName = ParquetTypeFileOperations.DefaultCompression,
configuration: Configuration = new Configuration(),
filenamePrefix: String = SortedBucketIO.DEFAULT_FILENAME_PREFIX,
outputDirectory: ResourceId = null,
tempDirectory: ResourceId = null,
filenameSuffix: String = DefaultSuffix
) extends SortedBucketIO.TransformOutput[K1, K2, T] {