public SortedBucketTransform()

in scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketTransform.java [87:150]


  public SortedBucketTransform(
      List<SortedBucketSource.BucketedInput<?>> sources,
      Function<SortedBucketIO.ComparableKeyBytes, FinalKeyT> keyFn,
      Comparator<SortedBucketIO.ComparableKeyBytes> keyComparator,
      TargetParallelism targetParallelism,
      TransformFn<FinalKeyT, FinalValueT> transformFn,
      TransformFnWithSideInputContext<FinalKeyT, FinalValueT> sideInputTransformFn,
      ResourceId outputDirectory,
      ResourceId tempDirectory,
      Iterable<PCollectionView<?>> sides,
      NewBucketMetadataFn<?, ?, FinalValueT> newBucketMetadataFn,
      FileOperations<FinalValueT> fileOperations,
      String filenameSuffix,
      String filenamePrefix) {
    Preconditions.checkNotNull(outputDirectory, "outputDirectory is not set");
    Preconditions.checkState(
        !((transformFn == null) && (sideInputTransformFn == null)), // at least one defined
        "At least one of transformFn and sideInputTransformFn must be set");
    Preconditions.checkState(
        !((transformFn != null) && (sideInputTransformFn != null)), // only one defined
        "At most one of transformFn and sideInputTransformFn may be set");
    if (sideInputTransformFn != null) {
      Preconditions.checkNotNull(sides, "If using sideInputTransformFn, sides must not be null");
    }

    final SMBFilenamePolicy filenamePolicy =
        new SMBFilenamePolicy(outputDirectory, filenamePrefix, filenameSuffix);
    final SourceSpec sourceSpec = SourceSpec.from(sources);
    bucketSource = new BucketSource<>(sources, targetParallelism, 1, 0, sourceSpec, -1);
    final FileAssignment fileAssignment = filenamePolicy.forTempFiles(tempDirectory);

    finalizeBuckets =
        new FinalizeTransformedBuckets<>(
            fileAssignment.getDirectory(),
            fileOperations,
            newBucketMetadataFn,
            filenamePolicy.forDestination(),
            sourceSpec.hashType);

    if (transformFn != null) {
      this.doFn =
          ParDo.of(
              new TransformWithNoSides<>(
                  sources,
                  keyFn,
                  keyComparator,
                  fileAssignment,
                  fileOperations,
                  transformFn,
                  getName()));
    } else {
      this.doFn =
          ParDo.of(
                  new TransformWithSides<>(
                      sources,
                      keyFn,
                      keyComparator,
                      fileAssignment,
                      fileOperations,
                      sideInputTransformFn,
                      getName()))
              .withSideInputs(sides);
    }
  }