State waitToFinish()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java [190:245]


  State waitToFinish(
      long timeToWait,
      TimeUnit timeUnit,
      MonitoringUtil.JobMessagesHandler messageHandler,
      Sleeper sleeper,
      NanoClock nanoClock)
          throws IOException, InterruptedException {
    MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowClient);

    long lastTimestamp = 0;
    BackOff backoff =
        timeUnit.toMillis(timeToWait) > 0
            ? new AttemptAndTimeBoundedExponentialBackOff(
                MESSAGES_POLLING_ATTEMPTS,
                MESSAGES_POLLING_INTERVAL,
                timeUnit.toMillis(timeToWait),
                AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
                nanoClock)
            : new AttemptBoundedExponentialBackOff(
                MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL);
    State state;
    do {
      // Get the state of the job before listing messages. This ensures we always fetch job
      // messages after the job finishes to ensure we have all them.
      state = getStateWithRetries(1, sleeper);
      boolean hasError = state == State.UNKNOWN;

      if (messageHandler != null && !hasError) {
        // Process all the job messages that have accumulated so far.
        try {
          List<JobMessage> allMessages = monitor.getJobMessages(
              jobId, lastTimestamp);

          if (!allMessages.isEmpty()) {
            lastTimestamp =
                fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis();
            messageHandler.process(allMessages);
          }
        } catch (GoogleJsonResponseException | SocketTimeoutException e) {
          hasError = true;
          LOG.warn("There were problems getting current job messages: {}.", e.getMessage());
          LOG.debug("Exception information:", e);
        }
      }

      if (!hasError) {
        backoff.reset();
        // Check if the job is done.
        if (state.isTerminal()) {
          return state;
        }
      }
    } while(BackOffUtils.next(sleeper, backoff));
    LOG.warn("No terminal state was returned.  State value {}", state);
    return null;  // Timed out.
  }