in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java [166:240]
public static DataflowPipelineRunner fromOptions(PipelineOptions options) {
// (Re-)register standard IO factories. Clobbers any prior credentials.
IOChannelUtils.registerStandardIOFactories(options);
DataflowPipelineOptions dataflowOptions =
PipelineOptionsValidator.validate(DataflowPipelineOptions.class, options);
ArrayList<String> missing = new ArrayList<>();
if (dataflowOptions.getAppName() == null) {
missing.add("appName");
}
if (missing.size() > 0) {
throw new IllegalArgumentException(
"Missing required values: " + Joiner.on(',').join(missing));
}
PathValidator validator = dataflowOptions.getPathValidator();
if (dataflowOptions.getStagingLocation() != null) {
validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
}
if (dataflowOptions.getTempLocation() != null) {
validator.validateOutputFilePrefixSupported(dataflowOptions.getTempLocation());
}
if (Strings.isNullOrEmpty(dataflowOptions.getTempLocation())) {
dataflowOptions.setTempLocation(dataflowOptions.getStagingLocation());
} else if (Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())) {
try {
dataflowOptions.setStagingLocation(
IOChannelUtils.resolve(dataflowOptions.getTempLocation(), "staging"));
} catch (IOException e) {
throw new IllegalArgumentException("Unable to resolve PipelineOptions.stagingLocation "
+ "from PipelineOptions.tempLocation. Please set the staging location explicitly.", e);
}
}
if (dataflowOptions.getFilesToStage() == null) {
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(
DataflowPipelineRunner.class.getClassLoader()));
LOG.info("PipelineOptions.filesToStage was not specified. "
+ "Defaulting to files from the classpath: will stage {} files. "
+ "Enable logging at DEBUG level to see which files will be staged.",
dataflowOptions.getFilesToStage().size());
LOG.debug("Classpath elements: {}", dataflowOptions.getFilesToStage());
}
// Verify jobName according to service requirements.
String jobName = dataflowOptions.getJobName().toLowerCase();
Preconditions.checkArgument(
jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"),
"JobName invalid; the name must consist of only the characters "
+ "[-a-z0-9], starting with a letter and ending with a letter "
+ "or number");
// Verify project
String project = dataflowOptions.getProject();
if (project.matches("[0-9]*")) {
throw new IllegalArgumentException("Project ID '" + project
+ "' invalid. Please make sure you specified the Project ID, not project number.");
} else if (!project.matches(PROJECT_ID_REGEXP)) {
throw new IllegalArgumentException("Project ID '" + project
+ "' invalid. Please make sure you specified the Project ID, not project description.");
}
DataflowPipelineDebugOptions debugOptions =
dataflowOptions.as(DataflowPipelineDebugOptions.class);
// Verify the number of worker threads is a valid value
if (debugOptions.getNumberOfWorkerHarnessThreads() < 0) {
throw new IllegalArgumentException("Number of worker harness threads '"
+ debugOptions.getNumberOfWorkerHarnessThreads()
+ "' invalid. Please make sure the value is non-negative.");
}
return new DataflowPipelineRunner(dataflowOptions);
}