in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java [285:337]
private static SourceSplitResponse performSplit(
SourceSplitRequest request, PipelineOptions options)
throws Exception {
Source<?> anySource = deserializeFromCloudSource(request.getSource().getSpec());
if (!(anySource instanceof BoundedSource)) {
throw new UnsupportedOperationException("Cannot split a non-Bounded source: " + anySource);
}
BoundedSource<?> source = (BoundedSource<?>) anySource;
LOG.debug("Splitting source: {}", source);
// Produce simple independent, unsplittable bundles with no metadata attached.
SourceSplitResponse response = new SourceSplitResponse();
response.setBundles(new ArrayList<DerivedSource>());
SourceSplitOptions splitOptions = request.getOptions();
Long desiredBundleSizeBytes =
(splitOptions == null) ? null : splitOptions.getDesiredBundleSizeBytes();
if (desiredBundleSizeBytes == null) {
desiredBundleSizeBytes = DEFAULT_DESIRED_BUNDLE_SIZE_BYTES;
}
List<? extends BoundedSource<?>> bundles =
source.splitIntoBundles(desiredBundleSizeBytes, options);
if (bundles.size() > MAX_NUMBER_OF_SPLITS) {
throw new IOException(
String.format(TOO_MANY_SOURCE_SPLITS_ERROR, bundles.size(), MAX_NUMBER_OF_SPLITS));
}
LOG.debug("Splitting produced {} bundles", bundles.size());
for (BoundedSource<?> split : bundles) {
try {
split.validate();
} catch (Exception e) {
throw new IllegalArgumentException(
"Splitting a valid source produced an invalid bundle. "
+ "\nOriginal source: "
+ source
+ "\nInvalid bundle: "
+ split,
e);
}
DerivedSource bundle = new DerivedSource();
com.google.api.services.dataflow.model.Source cloudSource =
serializeToCloudSource(split, options);
cloudSource.setDoesNotNeedSplitting(true);
bundle.setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT");
bundle.setSource(cloudSource);
response.getBundles().add(bundle);
}
response.setOutcome("SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED");
return response;
}