DataflowPipelineJob run()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java [81:155]


  DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {

    final JobMessagesHandler messageHandler =
        new MonitoringUtil.PrintHandler(options.getJobMessageOutput());
    final DataflowPipelineJob job;
    try {
      job = runner.run(pipeline);
    } catch (DataflowJobExecutionException ex) {
      throw new IllegalStateException("The dataflow failed.");
    }

    LOG.info("Running Dataflow job {} with {} expected assertions.",
        job.getJobId(), expectedNumberOfAssertions);

    try {
      final Optional<Boolean> result;
      if (options.isStreaming()) {
        Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
            new Callable<Optional<Boolean>>() {
          @Override
          public Optional<Boolean> call() throws Exception {
            try {
              for (;;) {
                Optional<Boolean> result = checkForSuccess(job);
                if (result.isPresent()) {
                  return result;
                }
                Thread.sleep(10000L);
              }
            } finally {
              LOG.info("Cancelling Dataflow job {}", job.getJobId());
              job.cancel();
            }
          }
        });
        State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, new JobMessagesHandler() {
            @Override
            public void process(List<JobMessage> messages) {
              messageHandler.process(messages);
              for (JobMessage message : messages) {
                if (message.getMessageImportance() != null
                    && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
                  LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}",
                      job.getJobId(), message.getMessageText());
                  try {
                    job.cancel();
                  } catch (Exception e) {
                    throw Throwables.propagate(e);
                  }
                }
              }
            }
          });
        if (finalState == null || finalState == State.RUNNING) {
          LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
              job.getJobId());
          job.cancel();
        }
        result = resultFuture.get();
      } else {
        job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
        result = checkForSuccess(job);
      }
      if (!result.isPresent()) {
        throw new IllegalStateException(
            "The dataflow did not output a success or failure metric.");
      } else if (!result.get()) {
        throw new IllegalStateException("The dataflow failed.");
      }
    } catch (Exception e) {
      Throwables.propagateIfPossible(e);
      throw Throwables.propagate(e);
    }
    return job;
  }