private void advance()

in scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/MultiSourceKeyGroupReader.java [113:242]


  private void advance() {
    // once all sources are exhausted, head is empty, so short circuit return
    if (initialized && head == null) return;

    initialized = true;
    while (true) {
      if (runningKeyGroupSize != 0) {
        keyGroupSize.update(runningKeyGroupSize);
        runningKeyGroupSize = 0;
      }

      // advance iterators whose values have already been used.
      bucketedInputs.stream()
          .filter(BucketIterator::shouldAdvance)
          .forEach(BucketIterator::advance);

      // only operate on the non-exhausted sources
      List<BucketIterator<?>> activeSources =
          bucketedInputs.stream().filter(BucketIterator::notExhausted).collect(Collectors.toList());

      // once all sources are exhausted, set head to empty and return
      if (activeSources.isEmpty()) {
        head = null;
        break;
      }

      // process keys in order, but since not all sources
      // have all keys, find the minimum available key
      final List<ComparableKeyBytes> consideredKeys =
          activeSources.stream().map(BucketIterator::currentKey).collect(Collectors.toList());
      // because we have checked that there are active sources, there will always be a min key
      ComparableKeyBytes minKey = consideredKeys.stream().min(keyComparator).get();
      final boolean emitBasedOnMinKeyBucketing = keyGroupFilter.apply(minKey.primary);

      // output accumulator
      final List<Iterable<?>> valueMap =
          IntStream.range(0, resultSchema.size())
              .mapToObj(i -> new ArrayList<>())
              .collect(Collectors.toList());

      // When a predicate is applied, a source containing a key may have no values after filtering.
      // Sources containing minKey are by default known to be NONEMPTY. Once all sources are
      // consumed, if all are known to be empty, the key group can be dropped.
      List<KeyGroupOutputSize> valueOutputSizes =
          IntStream.range(0, resultSchema.size())
              .mapToObj(i -> KeyGroupOutputSize.EMPTY)
              .collect(Collectors.toList());

      // minKey will be accepted or rejected by the first source which has it.
      // acceptKeyGroup short-circuits the 'emit' logic below once a decision is made on minKey.
      AcceptKeyGroup acceptKeyGroup = AcceptKeyGroup.UNSET;
      for (BucketIterator<?> src : activeSources) {
        // for each source, if the current key is equal to the minimum, consume it
        if (src.notExhausted() && keyComparator.compare(minKey, src.currentKey()) == 0) {
          // if this key group has been previously accepted by a preceding source, emit.
          // "  "    "   "     "   "    "          rejected by a preceding source, don't emit.
          // if this is the first source for this key group, emit if either the source settings or
          // the min key say we should.
          boolean emitKeyGroup =
              (acceptKeyGroup == AcceptKeyGroup.ACCEPT)
                  || ((acceptKeyGroup == AcceptKeyGroup.UNSET)
                      && (src.emitByDefault || emitBasedOnMinKeyBucketing));

          final Iterator<Object> keyGroupIterator = (Iterator<Object>) src.currentValue();
          if (emitKeyGroup) {
            acceptKeyGroup = AcceptKeyGroup.ACCEPT;
            // data must be eagerly materialized if requested or if there is a predicate
            boolean materialize = materializeKeyGroup || (src.predicate != null);
            int outputIndex = resultSchema.getIndex(src.tupleTag);

            if (!materialize) {
              // this source contains minKey, so is known to contain at least one value
              valueOutputSizes.set(outputIndex, KeyGroupOutputSize.NONEMPTY);
              // lazy data iterator
              valueMap.set(
                  outputIndex,
                  new SortedBucketSource.TraversableOnceIterable<>(
                      Iterators.transform(
                          keyGroupIterator,
                          (value) -> {
                            runningKeyGroupSize++;
                            return value;
                          })));

            } else {
              // eagerly materialize this iterator and apply the predicate to each value
              // this must be eager because the predicate can operate on the entire collection
              final List<Object> values = (List<Object>) valueMap.get(outputIndex);
              final SortedBucketSource.Predicate<Object> predicate =
                  (src.predicate == null)
                      ? ((xs, x) -> true)
                      : (SortedBucketSource.Predicate<Object>) src.predicate;
              keyGroupIterator.forEachRemaining(
                  v -> {
                    if ((predicate).apply(values, v)) {
                      values.add(v);
                      runningKeyGroupSize++;
                    } else {
                      predicateFilteredRecordsCounts.get(src.tupleTag).inc();
                    }
                  });
              KeyGroupOutputSize sz =
                  values.isEmpty() ? KeyGroupOutputSize.EMPTY : KeyGroupOutputSize.NONEMPTY;
              valueOutputSizes.set(outputIndex, sz);
            }
          } else {
            acceptKeyGroup = AcceptKeyGroup.REJECT;
            // skip key but still have to exhaust iterator
            keyGroupIterator.forEachRemaining(value -> {});
          }
        }
      }

      if (acceptKeyGroup == AcceptKeyGroup.ACCEPT) {
        // if all outputs are known-empty, omit this key group
        boolean allEmpty = valueOutputSizes.stream().allMatch(s -> s == KeyGroupOutputSize.EMPTY);
        if (!allEmpty) {
          final KV<ComparableKeyBytes, CoGbkResult> next =
              KV.of(minKey, CoGbkResultUtil.newCoGbkResult(resultSchema, valueMap));
          try {
            // new head found, we're done
            head = KV.of(keyFn.apply(next.getKey()), next.getValue());
            break;
          } catch (Exception e) {
            throw new RuntimeException("Failed to decode key group", e);
          }
        }
      }
    }
  }