Optional checkForSuccess()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java [168:214]


  Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
      throws IOException {
    State state = job.getState();
    if (state == State.FAILED || state == State.CANCELLED) {
      LOG.info("The pipeline failed");
      return Optional.of(false);
    }

    JobMetrics metrics = job.getDataflowClient().projects().jobs()
        .getMetrics(job.getProjectId(), job.getJobId()).execute();

    if (metrics == null || metrics.getMetrics() == null) {
      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
    } else {
      int successes = 0;
      int failures = 0;
      for (MetricUpdate metric : metrics.getMetrics()) {
        if (metric.getName() == null || metric.getName().getContext() == null
            || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
          // Don't double count using the non-tentative version of the metric.
          continue;
        }
        if (DataflowAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
          successes += ((BigDecimal) metric.getScalar()).intValue();
        } else if (DataflowAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
          failures += ((BigDecimal) metric.getScalar()).intValue();
        }
      }

      if (failures > 0) {
        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
            + "{} expected assertions.", job.getJobId(), successes, failures,
            expectedNumberOfAssertions);
        return Optional.of(false);
      } else if (successes >= expectedNumberOfAssertions) {
        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
            + "{} expected assertions.", job.getJobId(), successes, failures,
            expectedNumberOfAssertions);
        return Optional.of(true);
      }

      LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
          + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
    }

    return Optional.<Boolean>absent();
  }