in scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketTransform.java [448:506]
public void processElement(
@Element BucketItem e,
OutputReceiver<MergedBucket> out,
ProcessContext context,
BoundedWindow window) {
int bucketId = e.bucketOffsetId;
int effectiveParallelism = e.effectiveParallelism;
ResourceId dst =
fileAssignment.forBucket(BucketShardId.of(bucketId, 0), effectiveParallelism, 1);
OutputCollector<FinalValueT> outputCollector;
final Counter recordsWritten =
Metrics.counter(metricsPrefix, metricsPrefix + "-RecordsWritten");
try {
outputCollector = new OutputCollector<>(fileOperations.createWriter(dst), recordsWritten);
} catch (IOException err) {
throw new RuntimeException("Failed to create file writer for transformed output", err);
}
// get any arbitrary metadata to be able to rehash keys into buckets
BucketMetadata<?, ?, ?> someArbitraryBucketMetadata =
sources.get(0).getSourceMetadata().mapping.values().stream().findAny().get().metadata;
final MultiSourceKeyGroupReader<FinalKeyT> iter =
new MultiSourceKeyGroupReader<FinalKeyT>(
sources,
keyFn,
coGbkResultSchema(),
someArbitraryBucketMetadata,
keyComparator,
metricsPrefix,
false,
bucketId,
effectiveParallelism,
context.getPipelineOptions());
while (true) {
try {
KV<FinalKeyT, CoGbkResult> mergedKeyGroup = iter.readNext();
if (mergedKeyGroup == null) break;
outputTransform(mergedKeyGroup, context, outputCollector, window);
// exhaust iterators if necessary before moving on to the next key group:
// for example, if not every element was needed in the transformFn
sources.forEach(
source -> {
final Iterable<?> maybeUnfinishedIt =
mergedKeyGroup.getValue().getAll(source.getTupleTag());
if (SortedBucketSource.TraversableOnceIterable.class.isAssignableFrom(
maybeUnfinishedIt.getClass())) {
((SortedBucketSource.TraversableOnceIterable<?>) maybeUnfinishedIt)
.ensureExhausted();
}
});
} catch (Exception ex) {
throw new RuntimeException("Failed to write merged key group", ex);
}
}
outputCollector.onComplete();
out.output(new MergedBucket(bucketId, dst, effectiveParallelism));
}