in runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java [253:398]
public static DataflowRunner fromOptions(PipelineOptions options) {
DataflowPipelineOptions dataflowOptions =
PipelineOptionsValidator.validate(DataflowPipelineOptions.class, options);
ArrayList<String> missing = new ArrayList<>();
if (dataflowOptions.getAppName() == null) {
missing.add("appName");
}
if (Strings.isNullOrEmpty(dataflowOptions.getRegion())
&& isServiceEndpoint(dataflowOptions.getDataflowEndpoint())) {
missing.add("region");
}
if (missing.size() > 0) {
throw new IllegalArgumentException(
"Missing required pipeline options: " + Joiner.on(',').join(missing));
}
validateWorkerSettings(
PipelineOptionsValidator.validate(DataflowPipelineWorkerPoolOptions.class, options));
PathValidator validator = dataflowOptions.getPathValidator();
String gcpTempLocation;
try {
gcpTempLocation = dataflowOptions.getGcpTempLocation();
} catch (Exception e) {
throw new IllegalArgumentException(
"DataflowRunner requires gcpTempLocation, "
+ "but failed to retrieve a value from PipelineOptions",
e);
}
validator.validateOutputFilePrefixSupported(gcpTempLocation);
String stagingLocation;
try {
stagingLocation = dataflowOptions.getStagingLocation();
} catch (Exception e) {
throw new IllegalArgumentException(
"DataflowRunner requires stagingLocation, "
+ "but failed to retrieve a value from PipelineOptions",
e);
}
validator.validateOutputFilePrefixSupported(stagingLocation);
if (!isNullOrEmpty(dataflowOptions.getSaveProfilesToGcs())) {
validator.validateOutputFilePrefixSupported(dataflowOptions.getSaveProfilesToGcs());
}
if (dataflowOptions.getFilesToStage() != null) {
// The user specifically requested these files, so fail now if they do not exist.
// (automatically detected classpath elements are permitted to not exist, so later
// staging will not fail on nonexistent files)
dataflowOptions.getFilesToStage().stream()
.forEach(
stagedFileSpec -> {
File localFile;
if (stagedFileSpec.contains("=")) {
String[] components = stagedFileSpec.split("=", 2);
localFile = new File(components[1]);
} else {
localFile = new File(stagedFileSpec);
}
if (!localFile.exists()) {
// should be FileNotFoundException, but for build-time backwards compatibility
// cannot add checked exception
throw new RuntimeException(
String.format("Non-existent files specified in filesToStage: %s", localFile));
}
});
} else {
dataflowOptions.setFilesToStage(
detectClassPathResourcesToStage(DataflowRunner.class.getClassLoader(), options));
if (dataflowOptions.getFilesToStage().isEmpty()) {
throw new IllegalArgumentException("No files to stage has been found.");
} else {
LOG.info(
"PipelineOptions.filesToStage was not specified. "
+ "Defaulting to files from the classpath: will stage {} files. "
+ "Enable logging at DEBUG level to see which files will be staged.",
dataflowOptions.getFilesToStage().size());
LOG.debug("Classpath elements: {}", dataflowOptions.getFilesToStage());
}
}
// Verify jobName according to service requirements, truncating converting to lowercase if
// necessary.
String jobName = dataflowOptions.getJobName().toLowerCase();
checkArgument(
jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"),
"JobName invalid; the name must consist of only the characters "
+ "[-a-z0-9], starting with a letter and ending with a letter "
+ "or number");
if (!jobName.equals(dataflowOptions.getJobName())) {
LOG.info(
"PipelineOptions.jobName did not match the service requirements. "
+ "Using {} instead of {}.",
jobName,
dataflowOptions.getJobName());
}
dataflowOptions.setJobName(jobName);
// Verify project
String project = dataflowOptions.getProject();
if (project.matches("[0-9]*")) {
throw new IllegalArgumentException(
"Project ID '"
+ project
+ "' invalid. Please make sure you specified the Project ID, not project number.");
} else if (!project.matches(PROJECT_ID_REGEXP)) {
throw new IllegalArgumentException(
"Project ID '"
+ project
+ "' invalid. Please make sure you specified the Project ID, not project"
+ " description.");
}
DataflowPipelineDebugOptions debugOptions =
dataflowOptions.as(DataflowPipelineDebugOptions.class);
// Verify the number of worker threads is a valid value
if (debugOptions.getNumberOfWorkerHarnessThreads() < 0) {
throw new IllegalArgumentException(
"Number of worker harness threads '"
+ debugOptions.getNumberOfWorkerHarnessThreads()
+ "' invalid. Please make sure the value is non-negative.");
}
if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) {
dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT);
}
// Adding the Java version to the SDK name for user's and support convenience.
String agentJavaVer =
(Environments.getJavaVersion() == Environments.JavaVersion.v8)
? "(JRE 8 environment)"
: "(JDK 11 environment)";
DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
String userAgent =
String.format(
"%s/%s%s",
dataflowRunnerInfo.getName(), dataflowRunnerInfo.getVersion(), agentJavaVer)
.replace(" ", "_");
dataflowOptions.setUserAgent(userAgent);
return new DataflowRunner(dataflowOptions);
}