in sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java [168:214]
Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
throws IOException {
State state = job.getState();
if (state == State.FAILED || state == State.CANCELLED) {
LOG.info("The pipeline failed");
return Optional.of(false);
}
JobMetrics metrics = job.getDataflowClient().projects().jobs()
.getMetrics(job.getProjectId(), job.getJobId()).execute();
if (metrics == null || metrics.getMetrics() == null) {
LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
} else {
int successes = 0;
int failures = 0;
for (MetricUpdate metric : metrics.getMetrics()) {
if (metric.getName() == null || metric.getName().getContext() == null
|| !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
// Don't double count using the non-tentative version of the metric.
continue;
}
if (DataflowAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
successes += ((BigDecimal) metric.getScalar()).intValue();
} else if (DataflowAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
failures += ((BigDecimal) metric.getScalar()).intValue();
}
}
if (failures > 0) {
LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+ "{} expected assertions.", job.getJobId(), successes, failures,
expectedNumberOfAssertions);
return Optional.of(false);
} else if (successes >= expectedNumberOfAssertions) {
LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+ "{} expected assertions.", job.getJobId(), successes, failures,
expectedNumberOfAssertions);
return Optional.of(true);
}
LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
+ "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
}
return Optional.<Boolean>absent();
}