in scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeSortedBucketIO.scala [39:105]
def read[T: Coder: ParquetType](tupleTag: TupleTag[T]): Read[T] = Read(tupleTag)
def write[K: ClassTag, T: ClassTag: Coder: ParquetType](keyField: String): Write[K, Void, T] =
Write(keyField, None)
def write[K1: ClassTag, K2: ClassTag, T: ClassTag: Coder: ParquetType](
keyFieldPrimary: String,
keyFieldSecondary: String
): Write[K1, K2, T] =
Write(keyFieldPrimary, Option(keyFieldSecondary))
def transformOutput[K: ClassTag, T: ClassTag: Coder: ParquetType](
keyField: String
): TransformOutput[K, Void, T] =
TransformOutput(keyField, None)
def transformOutput[K1: ClassTag, K2: ClassTag, T: ClassTag: Coder: ParquetType](
keyFieldPrimary: String,
keyFieldSecondary: String
): TransformOutput[K1, K2, T] =
TransformOutput(keyFieldPrimary, Option(keyFieldSecondary))
case class Read[T: Coder: ParquetType](
tupleTag: TupleTag[T],
inputDirectories: Seq[String] = Nil,
filenameSuffix: String = DefaultSuffix,
filterPredicate: FilterPredicate = null,
predicate: Predicate[T] = null,
configuration: Configuration = new Configuration()
) extends SortedBucketIO.Read[T] {
def from(inputDirectories: String*): Read[T] =
this.copy(inputDirectories = inputDirectories)
def withSuffix(filenameSuffix: String): Read[T] =
this.copy(filenameSuffix = filenameSuffix)
def withFilterPredicate(filterPredicate: FilterPredicate): Read[T] =
this.copy(filterPredicate = filterPredicate)
def withPredicate(predicate: Predicate[T]): Read[T] =
this.copy(predicate = predicate)
def withConfiguration(configuration: Configuration): Read[T] =
this.copy(configuration = configuration)
def getInputDirectories: ImmutableList[String] =
ImmutableList.copyOf(inputDirectories.asJava: java.lang.Iterable[String])
def getFilenameSuffix: String = filenameSuffix
override def getTupleTag: TupleTag[T] = tupleTag
override def toBucketedInput(
keying: SortedBucketSource.Keying
): SortedBucketSource.BucketedInput[T] = {
BucketedInput.of(
keying,
getTupleTag,
inputDirectories.asJava,
filenameSuffix,
getFileOperations,
predicate
)
}
def getFileOperations: FileOperations[T] =
ParquetTypeFileOperations[T](filterPredicate, configuration)
}