in scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketTransform.java [87:150]
public SortedBucketTransform(
List<SortedBucketSource.BucketedInput<?>> sources,
Function<SortedBucketIO.ComparableKeyBytes, FinalKeyT> keyFn,
Comparator<SortedBucketIO.ComparableKeyBytes> keyComparator,
TargetParallelism targetParallelism,
TransformFn<FinalKeyT, FinalValueT> transformFn,
TransformFnWithSideInputContext<FinalKeyT, FinalValueT> sideInputTransformFn,
ResourceId outputDirectory,
ResourceId tempDirectory,
Iterable<PCollectionView<?>> sides,
NewBucketMetadataFn<?, ?, FinalValueT> newBucketMetadataFn,
FileOperations<FinalValueT> fileOperations,
String filenameSuffix,
String filenamePrefix) {
Preconditions.checkNotNull(outputDirectory, "outputDirectory is not set");
Preconditions.checkState(
!((transformFn == null) && (sideInputTransformFn == null)), // at least one defined
"At least one of transformFn and sideInputTransformFn must be set");
Preconditions.checkState(
!((transformFn != null) && (sideInputTransformFn != null)), // only one defined
"At most one of transformFn and sideInputTransformFn may be set");
if (sideInputTransformFn != null) {
Preconditions.checkNotNull(sides, "If using sideInputTransformFn, sides must not be null");
}
final SMBFilenamePolicy filenamePolicy =
new SMBFilenamePolicy(outputDirectory, filenamePrefix, filenameSuffix);
final SourceSpec sourceSpec = SourceSpec.from(sources);
bucketSource = new BucketSource<>(sources, targetParallelism, 1, 0, sourceSpec, -1);
final FileAssignment fileAssignment = filenamePolicy.forTempFiles(tempDirectory);
finalizeBuckets =
new FinalizeTransformedBuckets<>(
fileAssignment.getDirectory(),
fileOperations,
newBucketMetadataFn,
filenamePolicy.forDestination(),
sourceSpec.hashType);
if (transformFn != null) {
this.doFn =
ParDo.of(
new TransformWithNoSides<>(
sources,
keyFn,
keyComparator,
fileAssignment,
fileOperations,
transformFn,
getName()));
} else {
this.doFn =
ParDo.of(
new TransformWithSides<>(
sources,
keyFn,
keyComparator,
fileAssignment,
fileOperations,
sideInputTransformFn,
getName()))
.withSideInputs(sides);
}
}