OutputT applyInternal()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/Pipeline.java [320:371]


  OutputT applyInternal(String name, InputT input,
      PTransform<? super InputT, OutputT> transform) {
    input.finishSpecifying();

    TransformTreeNode parent = transforms.getCurrent();
    String namePrefix = parent.getFullName();
    String fullName = uniquifyInternal(namePrefix, name);

    boolean nameIsUnique = fullName.equals(buildName(namePrefix, name));

    if (!nameIsUnique) {
      switch (getOptions().getStableUniqueNames()) {
        case OFF:
          break;
        case WARNING:
          LOG.warn("Transform {} does not have a stable unique name. "
              + "This will prevent updating of pipelines.", fullName);
          break;
        case ERROR:
          throw new IllegalStateException(
              "Transform " + fullName + " does not have a stable unique name. "
              + "This will prevent updating of pipelines.");
        default:
          throw new IllegalArgumentException(
              "Unrecognized value for stable unique names: " + getOptions().getStableUniqueNames());
      }
    }

    TransformTreeNode child =
        new TransformTreeNode(parent, transform, fullName, input);
    parent.addComposite(child);

    transforms.addInput(child, input);

    LOG.debug("Adding {} to {}", transform, this);
    try {
      transforms.pushNode(child);
      transform.validate(input);
      OutputT output = runner.apply(transform, input);
      transforms.setOutput(child, output);

      AppliedPTransform<?, ?, ?> applied = AppliedPTransform.of(
          child.getFullName(), input, output, transform);
      transformApplicationsForTesting.put(transform, applied);
      // recordAsOutput is a NOOP if already called;
      output.recordAsOutput(applied);
      verifyOutputState(output, child);
      return output;
    } finally {
      transforms.popNode();
    }
  }