in scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/MultiSourceKeyGroupReader.java [113:242]
private void advance() {
// once all sources are exhausted, head is empty, so short circuit return
if (initialized && head == null) return;
initialized = true;
while (true) {
if (runningKeyGroupSize != 0) {
keyGroupSize.update(runningKeyGroupSize);
runningKeyGroupSize = 0;
}
// advance iterators whose values have already been used.
bucketedInputs.stream()
.filter(BucketIterator::shouldAdvance)
.forEach(BucketIterator::advance);
// only operate on the non-exhausted sources
List<BucketIterator<?>> activeSources =
bucketedInputs.stream().filter(BucketIterator::notExhausted).collect(Collectors.toList());
// once all sources are exhausted, set head to empty and return
if (activeSources.isEmpty()) {
head = null;
break;
}
// process keys in order, but since not all sources
// have all keys, find the minimum available key
final List<ComparableKeyBytes> consideredKeys =
activeSources.stream().map(BucketIterator::currentKey).collect(Collectors.toList());
// because we have checked that there are active sources, there will always be a min key
ComparableKeyBytes minKey = consideredKeys.stream().min(keyComparator).get();
final boolean emitBasedOnMinKeyBucketing = keyGroupFilter.apply(minKey.primary);
// output accumulator
final List<Iterable<?>> valueMap =
IntStream.range(0, resultSchema.size())
.mapToObj(i -> new ArrayList<>())
.collect(Collectors.toList());
// When a predicate is applied, a source containing a key may have no values after filtering.
// Sources containing minKey are by default known to be NONEMPTY. Once all sources are
// consumed, if all are known to be empty, the key group can be dropped.
List<KeyGroupOutputSize> valueOutputSizes =
IntStream.range(0, resultSchema.size())
.mapToObj(i -> KeyGroupOutputSize.EMPTY)
.collect(Collectors.toList());
// minKey will be accepted or rejected by the first source which has it.
// acceptKeyGroup short-circuits the 'emit' logic below once a decision is made on minKey.
AcceptKeyGroup acceptKeyGroup = AcceptKeyGroup.UNSET;
for (BucketIterator<?> src : activeSources) {
// for each source, if the current key is equal to the minimum, consume it
if (src.notExhausted() && keyComparator.compare(minKey, src.currentKey()) == 0) {
// if this key group has been previously accepted by a preceding source, emit.
// " " " " " " " rejected by a preceding source, don't emit.
// if this is the first source for this key group, emit if either the source settings or
// the min key say we should.
boolean emitKeyGroup =
(acceptKeyGroup == AcceptKeyGroup.ACCEPT)
|| ((acceptKeyGroup == AcceptKeyGroup.UNSET)
&& (src.emitByDefault || emitBasedOnMinKeyBucketing));
final Iterator<Object> keyGroupIterator = (Iterator<Object>) src.currentValue();
if (emitKeyGroup) {
acceptKeyGroup = AcceptKeyGroup.ACCEPT;
// data must be eagerly materialized if requested or if there is a predicate
boolean materialize = materializeKeyGroup || (src.predicate != null);
int outputIndex = resultSchema.getIndex(src.tupleTag);
if (!materialize) {
// this source contains minKey, so is known to contain at least one value
valueOutputSizes.set(outputIndex, KeyGroupOutputSize.NONEMPTY);
// lazy data iterator
valueMap.set(
outputIndex,
new SortedBucketSource.TraversableOnceIterable<>(
Iterators.transform(
keyGroupIterator,
(value) -> {
runningKeyGroupSize++;
return value;
})));
} else {
// eagerly materialize this iterator and apply the predicate to each value
// this must be eager because the predicate can operate on the entire collection
final List<Object> values = (List<Object>) valueMap.get(outputIndex);
final SortedBucketSource.Predicate<Object> predicate =
(src.predicate == null)
? ((xs, x) -> true)
: (SortedBucketSource.Predicate<Object>) src.predicate;
keyGroupIterator.forEachRemaining(
v -> {
if ((predicate).apply(values, v)) {
values.add(v);
runningKeyGroupSize++;
} else {
predicateFilteredRecordsCounts.get(src.tupleTag).inc();
}
});
KeyGroupOutputSize sz =
values.isEmpty() ? KeyGroupOutputSize.EMPTY : KeyGroupOutputSize.NONEMPTY;
valueOutputSizes.set(outputIndex, sz);
}
} else {
acceptKeyGroup = AcceptKeyGroup.REJECT;
// skip key but still have to exhaust iterator
keyGroupIterator.forEachRemaining(value -> {});
}
}
}
if (acceptKeyGroup == AcceptKeyGroup.ACCEPT) {
// if all outputs are known-empty, omit this key group
boolean allEmpty = valueOutputSizes.stream().allMatch(s -> s == KeyGroupOutputSize.EMPTY);
if (!allEmpty) {
final KV<ComparableKeyBytes, CoGbkResult> next =
KV.of(minKey, CoGbkResultUtil.newCoGbkResult(resultSchema, valueMap));
try {
// new head found, we're done
head = KV.of(keyFn.apply(next.getKey()), next.getValue());
break;
} catch (Exception e) {
throw new RuntimeException("Failed to decode key group", e);
}
}
}
}
}