in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java [94:161]
public DataflowPipelineJob run(Pipeline p) {
final DataflowPipelineJob job = dataflowPipelineRunner.run(p);
// We ignore the potential race condition here (Ctrl-C after job submission but before the
// shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture)
// the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a
// job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail,
// etc. If the user wants to verify the job was cancelled they should look at the job status.
Thread shutdownHook = new Thread() {
@Override
public void run() {
LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n"
+ "To cancel the job in the cloud, run:\n> {}",
MonitoringUtil.getGcloudCancelCommand(options, job.getJobId()));
}
};
try {
Runtime.getRuntime().addShutdownHook(shutdownHook);
@Nullable
State result;
try {
result = job.waitToFinish(
BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
} catch (IOException | InterruptedException ex) {
LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex);
throw new DataflowServiceException(
job, "Exception caught while retrieving status for job " + job.getJobId(), ex);
}
if (result == null) {
throw new DataflowServiceException(
job, "Timed out while retrieving status for job " + job.getJobId());
}
LOG.info("Job finished with status {}", result);
if (!result.isTerminal()) {
throw new IllegalStateException("Expected terminal state for job " + job.getJobId()
+ ", got " + result);
}
if (result == State.DONE) {
return job;
} else if (result == State.UPDATED) {
DataflowPipelineJob newJob = job.getReplacedByJob();
LOG.info("Job {} has been updated and is running as the new job with id {}."
+ "To access the updated job on the Dataflow monitoring console, please navigate to {}",
job.getJobId(),
newJob.getJobId(),
MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), newJob.getJobId()));
throw new DataflowJobUpdatedException(
job,
String.format("Job %s updated; new job is %s.", job.getJobId(), newJob.getJobId()),
newJob);
} else if (result == State.CANCELLED) {
String message = String.format("Job %s cancelled by user", job.getJobId());
LOG.info(message);
throw new DataflowJobCancelledException(job, message);
} else {
throw new DataflowJobExecutionException(job, "Job " + job.getJobId()
+ " failed with status " + result);
}
} finally {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
}