in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java [190:245]
State waitToFinish(
long timeToWait,
TimeUnit timeUnit,
MonitoringUtil.JobMessagesHandler messageHandler,
Sleeper sleeper,
NanoClock nanoClock)
throws IOException, InterruptedException {
MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowClient);
long lastTimestamp = 0;
BackOff backoff =
timeUnit.toMillis(timeToWait) > 0
? new AttemptAndTimeBoundedExponentialBackOff(
MESSAGES_POLLING_ATTEMPTS,
MESSAGES_POLLING_INTERVAL,
timeUnit.toMillis(timeToWait),
AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
nanoClock)
: new AttemptBoundedExponentialBackOff(
MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL);
State state;
do {
// Get the state of the job before listing messages. This ensures we always fetch job
// messages after the job finishes to ensure we have all them.
state = getStateWithRetries(1, sleeper);
boolean hasError = state == State.UNKNOWN;
if (messageHandler != null && !hasError) {
// Process all the job messages that have accumulated so far.
try {
List<JobMessage> allMessages = monitor.getJobMessages(
jobId, lastTimestamp);
if (!allMessages.isEmpty()) {
lastTimestamp =
fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis();
messageHandler.process(allMessages);
}
} catch (GoogleJsonResponseException | SocketTimeoutException e) {
hasError = true;
LOG.warn("There were problems getting current job messages: {}.", e.getMessage());
LOG.debug("Exception information:", e);
}
}
if (!hasError) {
backoff.reset();
// Check if the job is done.
if (state.isTerminal()) {
return state;
}
}
} while(BackOffUtils.next(sleeper, backoff));
LOG.warn("No terminal state was returned. State value {}", state);
return null; // Timed out.
}