public static void assertSplitAtFractionExhaustive()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java [511:567]


  public static <T> void assertSplitAtFractionExhaustive(
      BoundedSource<T> source, PipelineOptions options) throws Exception {
    List<T> expectedItems = readFromSource(source, options);
    assertFalse("Empty source", expectedItems.isEmpty());
    assertFalse("Source reads a single item", expectedItems.size() == 1);
    List<List<Double>> allNonTrivialFractions = new ArrayList<>();
    {
      boolean anySuccessfulFractions = false;
      boolean anyNonTrivialFractions = false;
      for (int i = 0; i < expectedItems.size(); i++) {
        SplitFractionStatistics stats = new SplitFractionStatistics();
        assertSplitAtFractionBinary(source, expectedItems, i,
            0.0, null, 1.0, null, options, stats);
        if (!stats.successfulFractions.isEmpty()) {
          anySuccessfulFractions = true;
        }
        if (!stats.nonTrivialFractions.isEmpty()) {
          anyNonTrivialFractions = true;
        }
        allNonTrivialFractions.add(stats.nonTrivialFractions);
      }
      assertTrue(
          "splitAtFraction test completed vacuously: no successful split fractions found",
          anySuccessfulFractions);
      assertTrue(
          "splitAtFraction test completed vacuously: no non-trivial split fractions found",
          anyNonTrivialFractions);
    }
    {
      // Perform a stress test of "racy" concurrent splitting:
      // for every position (number of items read), try to split at the minimum nontrivial
      // split fraction for that position concurrently with reading the record at that position.
      // To ensure that the test is non-vacuous, make sure that the splitting succeeds
      // at least once and fails at least once.
      ExecutorService executor = Executors.newFixedThreadPool(2);
      for (int i = 0; i < expectedItems.size(); i++) {
        double minNonTrivialFraction = 2.0;  // Greater than any possible fraction.
        for (double fraction : allNonTrivialFractions.get(i)) {
          minNonTrivialFraction = Math.min(minNonTrivialFraction, fraction);
        }
        if (minNonTrivialFraction == 2.0) {
          // This will not happen all the time because otherwise the test above would
          // detect vacuousness.
          continue;
        }
        boolean haveSuccess = false, haveFailure = false;
        while (!haveSuccess || !haveFailure) {
          if (assertSplitAtFractionConcurrent(
              executor, source, expectedItems, i, minNonTrivialFraction, options)) {
            haveSuccess = true;
          } else {
            haveFailure = true;
          }
        }
      }
    }
  }