public void processElement()

in scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketTransform.java [448:506]


    public void processElement(
        @Element BucketItem e,
        OutputReceiver<MergedBucket> out,
        ProcessContext context,
        BoundedWindow window) {
      int bucketId = e.bucketOffsetId;
      int effectiveParallelism = e.effectiveParallelism;

      ResourceId dst =
          fileAssignment.forBucket(BucketShardId.of(bucketId, 0), effectiveParallelism, 1);
      OutputCollector<FinalValueT> outputCollector;
      final Counter recordsWritten =
          Metrics.counter(metricsPrefix, metricsPrefix + "-RecordsWritten");
      try {
        outputCollector = new OutputCollector<>(fileOperations.createWriter(dst), recordsWritten);
      } catch (IOException err) {
        throw new RuntimeException("Failed to create file writer for transformed output", err);
      }

      // get any arbitrary metadata to be able to rehash keys into buckets
      BucketMetadata<?, ?, ?> someArbitraryBucketMetadata =
          sources.get(0).getSourceMetadata().mapping.values().stream().findAny().get().metadata;
      final MultiSourceKeyGroupReader<FinalKeyT> iter =
          new MultiSourceKeyGroupReader<FinalKeyT>(
              sources,
              keyFn,
              coGbkResultSchema(),
              someArbitraryBucketMetadata,
              keyComparator,
              metricsPrefix,
              false,
              bucketId,
              effectiveParallelism,
              context.getPipelineOptions());
      while (true) {
        try {
          KV<FinalKeyT, CoGbkResult> mergedKeyGroup = iter.readNext();
          if (mergedKeyGroup == null) break;
          outputTransform(mergedKeyGroup, context, outputCollector, window);

          // exhaust iterators if necessary before moving on to the next key group:
          // for example, if not every element was needed in the transformFn
          sources.forEach(
              source -> {
                final Iterable<?> maybeUnfinishedIt =
                    mergedKeyGroup.getValue().getAll(source.getTupleTag());
                if (SortedBucketSource.TraversableOnceIterable.class.isAssignableFrom(
                    maybeUnfinishedIt.getClass())) {
                  ((SortedBucketSource.TraversableOnceIterable<?>) maybeUnfinishedIt)
                      .ensureExhausted();
                }
              });
        } catch (Exception ex) {
          throw new RuntimeException("Failed to write merged key group", ex);
        }
      }
      outputCollector.onComplete();
      out.output(new MergedBucket(bucketId, dst, effectiveParallelism));
    }