private static SourceSplitResponse performSplit()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java [285:337]


  private static SourceSplitResponse performSplit(
      SourceSplitRequest request, PipelineOptions options)
      throws Exception {
    Source<?> anySource = deserializeFromCloudSource(request.getSource().getSpec());
    if (!(anySource instanceof BoundedSource)) {
      throw new UnsupportedOperationException("Cannot split a non-Bounded source: " + anySource);
    }
    BoundedSource<?> source = (BoundedSource<?>) anySource;
    LOG.debug("Splitting source: {}", source);

    // Produce simple independent, unsplittable bundles with no metadata attached.
    SourceSplitResponse response = new SourceSplitResponse();
    response.setBundles(new ArrayList<DerivedSource>());
    SourceSplitOptions splitOptions = request.getOptions();
    Long desiredBundleSizeBytes =
        (splitOptions == null) ? null : splitOptions.getDesiredBundleSizeBytes();
    if (desiredBundleSizeBytes == null) {
      desiredBundleSizeBytes = DEFAULT_DESIRED_BUNDLE_SIZE_BYTES;
    }
    List<? extends BoundedSource<?>> bundles =
        source.splitIntoBundles(desiredBundleSizeBytes, options);

    if (bundles.size() > MAX_NUMBER_OF_SPLITS) {
      throw new IOException(
          String.format(TOO_MANY_SOURCE_SPLITS_ERROR, bundles.size(), MAX_NUMBER_OF_SPLITS));
    }

    LOG.debug("Splitting produced {} bundles", bundles.size());
    for (BoundedSource<?> split : bundles) {
      try {
        split.validate();
      } catch (Exception e) {
        throw new IllegalArgumentException(
            "Splitting a valid source produced an invalid bundle. "
                + "\nOriginal source: "
                + source
                + "\nInvalid bundle: "
                + split,
            e);
      }
      DerivedSource bundle = new DerivedSource();

      com.google.api.services.dataflow.model.Source cloudSource =
          serializeToCloudSource(split, options);
      cloudSource.setDoesNotNeedSplitting(true);

      bundle.setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT");
      bundle.setSource(cloudSource);
      response.getBundles().add(bundle);
    }
    response.setOutcome("SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED");
    return response;
  }