in sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java [81:155]
DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
final JobMessagesHandler messageHandler =
new MonitoringUtil.PrintHandler(options.getJobMessageOutput());
final DataflowPipelineJob job;
try {
job = runner.run(pipeline);
} catch (DataflowJobExecutionException ex) {
throw new IllegalStateException("The dataflow failed.");
}
LOG.info("Running Dataflow job {} with {} expected assertions.",
job.getJobId(), expectedNumberOfAssertions);
try {
final Optional<Boolean> result;
if (options.isStreaming()) {
Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
new Callable<Optional<Boolean>>() {
@Override
public Optional<Boolean> call() throws Exception {
try {
for (;;) {
Optional<Boolean> result = checkForSuccess(job);
if (result.isPresent()) {
return result;
}
Thread.sleep(10000L);
}
} finally {
LOG.info("Cancelling Dataflow job {}", job.getJobId());
job.cancel();
}
}
});
State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, new JobMessagesHandler() {
@Override
public void process(List<JobMessage> messages) {
messageHandler.process(messages);
for (JobMessage message : messages) {
if (message.getMessageImportance() != null
&& message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}",
job.getJobId(), message.getMessageText());
try {
job.cancel();
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
}
});
if (finalState == null || finalState == State.RUNNING) {
LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
job.getJobId());
job.cancel();
}
result = resultFuture.get();
} else {
job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
result = checkForSuccess(job);
}
if (!result.isPresent()) {
throw new IllegalStateException(
"The dataflow did not output a success or failure metric.");
} else if (!result.get()) {
throw new IllegalStateException("The dataflow failed.");
}
} catch (Exception e) {
Throwables.propagateIfPossible(e);
throw Throwables.propagate(e);
}
return job;
}