in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java [139:242]
static List<DataflowPackage> stageClasspathElements(
Collection<String> classpathElements, String stagingPath,
Sleeper retrySleeper) {
LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
+ "prepare for execution.", classpathElements.size());
if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
+ "copies to all workers. Having this many entries on your classpath may be indicative "
+ "of an issue in your pipeline. You may want to consider trimming the classpath to "
+ "necessary dependencies only, using --filesToStage pipeline option to override "
+ "what files are being staged, or bundling several dependencies into one.",
classpathElements.size());
}
ArrayList<DataflowPackage> packages = new ArrayList<>();
if (stagingPath == null) {
throw new IllegalArgumentException(
"Can't stage classpath elements on because no staging location has been provided");
}
int numUploaded = 0;
int numCached = 0;
for (String classpathElement : classpathElements) {
String packageName = null;
if (classpathElement.contains("=")) {
String[] components = classpathElement.split("=", 2);
packageName = components[0];
classpathElement = components[1];
}
File file = new File(classpathElement);
if (!file.exists()) {
LOG.warn("Skipping non-existent classpath element {} that was specified.",
classpathElement);
continue;
}
PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);
DataflowPackage workflowPackage = attributes.getDataflowPackage();
packages.add(workflowPackage);
String target = workflowPackage.getLocation();
// TODO: Should we attempt to detect the Mime type rather than
// always using MimeTypes.BINARY?
try {
try {
long remoteLength = IOChannelUtils.getSizeBytes(target);
if (remoteLength == attributes.getSize()) {
LOG.debug("Skipping classpath element already staged: {} at {}",
classpathElement, target);
numCached++;
continue;
}
} catch (FileNotFoundException expected) {
// If the file doesn't exist, it means we need to upload it.
}
// Upload file, retrying on failure.
AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
MAX_ATTEMPTS,
INITIAL_BACKOFF_INTERVAL_MS);
while (true) {
try {
LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) {
copyContent(classpathElement, writer);
}
numUploaded++;
break;
} catch (IOException e) {
if (ERROR_EXTRACTOR.accessDenied(e)) {
String errorMessage = String.format(
"Uploaded failed due to permissions error, will NOT retry staging "
+ "of classpath %s. Please verify credentials are valid and that you have "
+ "write access to %s. Stale credentials can be resolved by executing "
+ "'gcloud auth login'.", classpathElement, target);
LOG.error(errorMessage);
throw new IOException(errorMessage, e);
} else if (!backoff.atMaxAttempts()) {
LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
classpathElement, e);
BackOffUtils.next(retrySleeper, backoff);
} else {
// Rethrow last error, to be included as a cause in the catch below.
LOG.error("Upload failed, will NOT retry staging of classpath: {}",
classpathElement, e);
throw e;
}
}
}
} catch (Exception e) {
throw new RuntimeException("Could not stage classpath element: " + classpathElement, e);
}
}
LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
+ "{} files cached",
numUploaded, numCached);
return packages;
}