static List stageClasspathElements()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java [139:242]


  static List<DataflowPackage> stageClasspathElements(
      Collection<String> classpathElements, String stagingPath,
      Sleeper retrySleeper) {
    LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
        + "prepare for execution.", classpathElements.size());

    if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
      LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
          + "copies to all workers. Having this many entries on your classpath may be indicative "
          + "of an issue in your pipeline. You may want to consider trimming the classpath to "
          + "necessary dependencies only, using --filesToStage pipeline option to override "
          + "what files are being staged, or bundling several dependencies into one.",
          classpathElements.size());
    }

    ArrayList<DataflowPackage> packages = new ArrayList<>();

    if (stagingPath == null) {
      throw new IllegalArgumentException(
          "Can't stage classpath elements on because no staging location has been provided");
    }

    int numUploaded = 0;
    int numCached = 0;
    for (String classpathElement : classpathElements) {
      String packageName = null;
      if (classpathElement.contains("=")) {
        String[] components = classpathElement.split("=", 2);
        packageName = components[0];
        classpathElement = components[1];
      }

      File file = new File(classpathElement);
      if (!file.exists()) {
        LOG.warn("Skipping non-existent classpath element {} that was specified.",
            classpathElement);
        continue;
      }

      PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);

      DataflowPackage workflowPackage = attributes.getDataflowPackage();
      packages.add(workflowPackage);
      String target = workflowPackage.getLocation();

      // TODO: Should we attempt to detect the Mime type rather than
      // always using MimeTypes.BINARY?
      try {
        try {
          long remoteLength = IOChannelUtils.getSizeBytes(target);
          if (remoteLength == attributes.getSize()) {
            LOG.debug("Skipping classpath element already staged: {} at {}",
                classpathElement, target);
            numCached++;
            continue;
          }
        } catch (FileNotFoundException expected) {
          // If the file doesn't exist, it means we need to upload it.
        }

        // Upload file, retrying on failure.
        AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
            MAX_ATTEMPTS,
            INITIAL_BACKOFF_INTERVAL_MS);
        while (true) {
          try {
            LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
            try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) {
              copyContent(classpathElement, writer);
            }
            numUploaded++;
            break;
          } catch (IOException e) {
            if (ERROR_EXTRACTOR.accessDenied(e)) {
              String errorMessage = String.format(
                  "Uploaded failed due to permissions error, will NOT retry staging "
                  + "of classpath %s. Please verify credentials are valid and that you have "
                  + "write access to %s. Stale credentials can be resolved by executing "
                  + "'gcloud auth login'.", classpathElement, target);
              LOG.error(errorMessage);
              throw new IOException(errorMessage, e);
            } else if (!backoff.atMaxAttempts()) {
              LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
                  classpathElement, e);
              BackOffUtils.next(retrySleeper, backoff);
            } else {
              // Rethrow last error, to be included as a cause in the catch below.
              LOG.error("Upload failed, will NOT retry staging of classpath: {}",
                  classpathElement, e);
              throw e;
            }
          }
        }
      } catch (Exception e) {
        throw new RuntimeException("Could not stage classpath element: " + classpathElement, e);
      }
    }

    LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
        + "{} files cached",
        numUploaded, numCached);

    return packages;
  }