in sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java [511:567]
public static <T> void assertSplitAtFractionExhaustive(
BoundedSource<T> source, PipelineOptions options) throws Exception {
List<T> expectedItems = readFromSource(source, options);
assertFalse("Empty source", expectedItems.isEmpty());
assertFalse("Source reads a single item", expectedItems.size() == 1);
List<List<Double>> allNonTrivialFractions = new ArrayList<>();
{
boolean anySuccessfulFractions = false;
boolean anyNonTrivialFractions = false;
for (int i = 0; i < expectedItems.size(); i++) {
SplitFractionStatistics stats = new SplitFractionStatistics();
assertSplitAtFractionBinary(source, expectedItems, i,
0.0, null, 1.0, null, options, stats);
if (!stats.successfulFractions.isEmpty()) {
anySuccessfulFractions = true;
}
if (!stats.nonTrivialFractions.isEmpty()) {
anyNonTrivialFractions = true;
}
allNonTrivialFractions.add(stats.nonTrivialFractions);
}
assertTrue(
"splitAtFraction test completed vacuously: no successful split fractions found",
anySuccessfulFractions);
assertTrue(
"splitAtFraction test completed vacuously: no non-trivial split fractions found",
anyNonTrivialFractions);
}
{
// Perform a stress test of "racy" concurrent splitting:
// for every position (number of items read), try to split at the minimum nontrivial
// split fraction for that position concurrently with reading the record at that position.
// To ensure that the test is non-vacuous, make sure that the splitting succeeds
// at least once and fails at least once.
ExecutorService executor = Executors.newFixedThreadPool(2);
for (int i = 0; i < expectedItems.size(); i++) {
double minNonTrivialFraction = 2.0; // Greater than any possible fraction.
for (double fraction : allNonTrivialFractions.get(i)) {
minNonTrivialFraction = Math.min(minNonTrivialFraction, fraction);
}
if (minNonTrivialFraction == 2.0) {
// This will not happen all the time because otherwise the test above would
// detect vacuousness.
continue;
}
boolean haveSuccess = false, haveFailure = false;
while (!haveSuccess || !haveFailure) {
if (assertSplitAtFractionConcurrent(
executor, source, expectedItems, i, minNonTrivialFraction, options)) {
haveSuccess = true;
} else {
haveFailure = true;
}
}
}
}
}