in sdk/src/main/java/com/google/cloud/dataflow/sdk/Pipeline.java [320:371]
OutputT applyInternal(String name, InputT input,
PTransform<? super InputT, OutputT> transform) {
input.finishSpecifying();
TransformTreeNode parent = transforms.getCurrent();
String namePrefix = parent.getFullName();
String fullName = uniquifyInternal(namePrefix, name);
boolean nameIsUnique = fullName.equals(buildName(namePrefix, name));
if (!nameIsUnique) {
switch (getOptions().getStableUniqueNames()) {
case OFF:
break;
case WARNING:
LOG.warn("Transform {} does not have a stable unique name. "
+ "This will prevent updating of pipelines.", fullName);
break;
case ERROR:
throw new IllegalStateException(
"Transform " + fullName + " does not have a stable unique name. "
+ "This will prevent updating of pipelines.");
default:
throw new IllegalArgumentException(
"Unrecognized value for stable unique names: " + getOptions().getStableUniqueNames());
}
}
TransformTreeNode child =
new TransformTreeNode(parent, transform, fullName, input);
parent.addComposite(child);
transforms.addInput(child, input);
LOG.debug("Adding {} to {}", transform, this);
try {
transforms.pushNode(child);
transform.validate(input);
OutputT output = runner.apply(transform, input);
transforms.setOutput(child, output);
AppliedPTransform<?, ?, ?> applied = AppliedPTransform.of(
child.getFullName(), input, output, transform);
transformApplicationsForTesting.put(transform, applied);
// recordAsOutput is a NOOP if already called;
output.recordAsOutput(applied);
verifyOutputState(output, child);
return output;
} finally {
transforms.popNode();
}
}