in sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java [451:503]
private static <T> void assertSplitAtFractionBinary(
BoundedSource<T> source,
List<T> expectedItems,
int numItemsToBeReadBeforeSplit,
double leftFraction,
SplitAtFractionResult leftResult,
double rightFraction,
SplitAtFractionResult rightResult,
PipelineOptions options,
SplitFractionStatistics stats)
throws Exception {
if (rightFraction - leftFraction < 0.001) {
// Do not recurse too deeply. Otherwise we will end up in infinite
// recursion, e.g., while trying to find the exact minimal fraction s.t.
// split succeeds. A precision of 0.001 when looking for such a fraction
// ought to be enough for everybody.
return;
}
double middleFraction = (rightFraction + leftFraction) / 2;
if (leftResult == null) {
leftResult = assertSplitAtFractionBehaviorImpl(
source, expectedItems, numItemsToBeReadBeforeSplit, leftFraction,
ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
}
if (rightResult == null) {
rightResult = assertSplitAtFractionBehaviorImpl(
source, expectedItems, numItemsToBeReadBeforeSplit, rightFraction,
ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
}
SplitAtFractionResult middleResult = assertSplitAtFractionBehaviorImpl(
source, expectedItems, numItemsToBeReadBeforeSplit, middleFraction,
ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
if (middleResult.numResidualItems != -1) {
stats.successfulFractions.add(middleFraction);
}
if (middleResult.numResidualItems > 0) {
stats.nonTrivialFractions.add(middleFraction);
}
// Two split fractions are equivalent if they yield the same number of
// items in primary vs. residual source. Left and right are already not
// equivalent. Recurse into [left, middle) and [right, middle) respectively
// if middle is not equivalent to left or right.
if (leftResult.numPrimaryItems != middleResult.numPrimaryItems) {
assertSplitAtFractionBinary(
source, expectedItems, numItemsToBeReadBeforeSplit,
leftFraction, leftResult, middleFraction, middleResult, options, stats);
}
if (rightResult.numPrimaryItems != middleResult.numPrimaryItems) {
assertSplitAtFractionBinary(
source, expectedItems, numItemsToBeReadBeforeSplit,
middleFraction, middleResult, rightFraction, rightResult, options, stats);
}
}