in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/GroupingShuffleRangeTracker.java [70:118]
public synchronized boolean tryReturnRecordAt(
boolean isAtSplitPoint, ByteArrayShufflePosition groupStart) {
if (lastGroupStart == null && !isAtSplitPoint) {
throw new IllegalStateException(
String.format("The first group [at %s] must be at a split point",
groupStart.encodeBase64()));
}
if (this.startPosition != null && groupStart.compareTo(this.startPosition) < 0) {
throw new IllegalStateException(
String.format(
"Trying to return record at %s which is before the starting position at %s",
groupStart,
this.startPosition));
}
int comparedToLast = (lastGroupStart == null) ? 1 : groupStart.compareTo(this.lastGroupStart);
if (comparedToLast < 0) {
throw new IllegalStateException(
String.format(
"Trying to return group at %s which is before the last-returned group at %s",
groupStart,
this.lastGroupStart));
}
if (isAtSplitPoint) {
if (comparedToLast == 0) {
throw new IllegalStateException(
String.format(
"Trying to return a group at a split point with same position as the "
+ "previous group: both at %s, last group was %s",
groupStart,
lastGroupWasAtSplitPoint ? "at a split point." : "not at a split point."));
}
if (stopPosition != null && groupStart.compareTo(stopPosition) >= 0) {
return false;
}
} else if (comparedToLast != 0) {
// This case is not a violation of general RangeTracker semantics, but it is
// contrary to how GroupingShuffleReader in particular works. Hitting it would
// mean it's behaving unexpectedly.
throw new IllegalStateException(
String.format(
"Trying to return a group not at a split point, but with a different position "
+ "than the previous group: last group was %s at %s, current at %s",
lastGroupWasAtSplitPoint ? "a split point" : "a non-split point",
lastGroupStart, groupStart));
}
this.lastGroupStart = groupStart;
this.lastGroupWasAtSplitPoint = isAtSplitPoint;
return true;
}