public DataflowPipelineJob run()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java [94:161]


  public DataflowPipelineJob run(Pipeline p) {
    final DataflowPipelineJob job = dataflowPipelineRunner.run(p);

    // We ignore the potential race condition here (Ctrl-C after job submission but before the
    // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture)
    // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a
    // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail,
    // etc. If the user wants to verify the job was cancelled they should look at the job status.
    Thread shutdownHook = new Thread() {
      @Override
      public void run() {
        LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n"
            + "To cancel the job in the cloud, run:\n> {}",
            MonitoringUtil.getGcloudCancelCommand(options, job.getJobId()));
      }
    };

    try {
      Runtime.getRuntime().addShutdownHook(shutdownHook);

      @Nullable
      State result;
      try {
        result = job.waitToFinish(
            BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
            new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
      } catch (IOException | InterruptedException ex) {
        LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex);
        throw new DataflowServiceException(
            job, "Exception caught while retrieving status for job " + job.getJobId(), ex);
      }

      if (result == null) {
        throw new DataflowServiceException(
            job, "Timed out while retrieving status for job " + job.getJobId());
      }

      LOG.info("Job finished with status {}", result);
      if (!result.isTerminal()) {
        throw new IllegalStateException("Expected terminal state for job " + job.getJobId()
            + ", got " + result);
      }

      if (result == State.DONE) {
        return job;
      } else if (result == State.UPDATED) {
        DataflowPipelineJob newJob = job.getReplacedByJob();
        LOG.info("Job {} has been updated and is running as the new job with id {}."
            + "To access the updated job on the Dataflow monitoring console, please navigate to {}",
            job.getJobId(),
            newJob.getJobId(),
            MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), newJob.getJobId()));
        throw new DataflowJobUpdatedException(
            job,
            String.format("Job %s updated; new job is %s.", job.getJobId(), newJob.getJobId()),
            newJob);
      } else if (result == State.CANCELLED) {
        String message = String.format("Job %s cancelled by user", job.getJobId());
        LOG.info(message);
        throw new DataflowJobCancelledException(job, message);
      } else {
        throw new DataflowJobExecutionException(job, "Job " + job.getJobId()
            + " failed with status " + result);
      }
    } finally {
      Runtime.getRuntime().removeShutdownHook(shutdownHook);
    }
  }