in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java [770:1013]
static {
registerTransformTranslator(
View.CreatePCollectionView.class,
new TransformTranslator<View.CreatePCollectionView>() {
@Override
public void translate(
View.CreatePCollectionView transform,
TranslationContext context) {
translateTyped(transform, context);
}
private <ElemT, ViewT> void translateTyped(
View.CreatePCollectionView<ElemT, ViewT> transform,
TranslationContext context) {
context.addStep(transform, "CollectionToSingleton");
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
context.addCollectionToSingletonOutput(
PropertyNames.OUTPUT,
context.getInput(transform),
context.getOutput(transform));
}
});
DataflowPipelineTranslator.registerTransformTranslator(
Combine.GroupedValues.class,
new DataflowPipelineTranslator.TransformTranslator<Combine.GroupedValues>() {
@Override
public void translate(
Combine.GroupedValues transform,
DataflowPipelineTranslator.TranslationContext context) {
translateHelper(transform, context);
}
private <K, InputT, OutputT> void translateHelper(
final Combine.GroupedValues<K, InputT, OutputT> transform,
DataflowPipelineTranslator.TranslationContext context) {
context.addStep(transform, "CombineValues");
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
transform.getAppliedFn(
context.getInput(transform).getPipeline().getCoderRegistry(),
context.getInput(transform).getCoder());
context.addEncodingInput(fn.getAccumulatorCoder());
context.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeToByteArray(fn)));
context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
}
});
registerTransformTranslator(
Create.Values.class,
new TransformTranslator<Create.Values>() {
@Override
public void translate(
Create.Values transform,
TranslationContext context) {
createHelper(transform, context);
}
private <T> void createHelper(
Create.Values<T> transform,
TranslationContext context) {
context.addStep(transform, "CreateCollection");
Coder<T> coder = context.getOutput(transform).getCoder();
List<CloudObject> elements = new LinkedList<>();
for (T elem : transform.getElements()) {
byte[] encodedBytes;
try {
encodedBytes = encodeToByteArray(coder, elem);
} catch (CoderException exn) {
// TODO: Put in better element printing:
// truncate if too long.
throw new IllegalArgumentException(
"Unable to encode element '" + elem + "' of transform '" + transform
+ "' using coder '" + coder + "'.",
exn);
}
String encodedJson = byteArrayToJsonString(encodedBytes);
assert Arrays.equals(encodedBytes,
jsonStringToByteArray(encodedJson));
elements.add(CloudObject.forString(encodedJson));
}
context.addInput(PropertyNames.ELEMENT, elements);
context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
}
});
registerTransformTranslator(
Flatten.FlattenPCollectionList.class,
new TransformTranslator<Flatten.FlattenPCollectionList>() {
@Override
public void translate(
Flatten.FlattenPCollectionList transform,
TranslationContext context) {
flattenHelper(transform, context);
}
private <T> void flattenHelper(
Flatten.FlattenPCollectionList<T> transform,
TranslationContext context) {
context.addStep(transform, "Flatten");
List<OutputReference> inputs = new LinkedList<>();
for (PCollection<T> input : context.getInput(transform).getAll()) {
inputs.add(context.asOutputReference(input));
}
context.addInput(PropertyNames.INPUTS, inputs);
context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
}
});
registerTransformTranslator(
GroupByKey.class,
new TransformTranslator<GroupByKey>() {
@Override
public void translate(
GroupByKey transform,
TranslationContext context) {
groupByKeyHelper(transform, context);
}
private <K, V> void groupByKeyHelper(
GroupByKey<K, V> transform,
TranslationContext context) {
context.addStep(transform, "GroupByKey");
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
WindowingStrategy<?, ?> windowingStrategy =
context.getInput(transform).getWindowingStrategy();
boolean isStreaming =
context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
boolean disallowCombinerLifting =
!windowingStrategy.getWindowFn().isNonMerging()
|| (isStreaming && !transform.fewKeys())
// TODO: Allow combiner lifting on the non-default trigger, as appropriate.
|| !(windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger);
context.addInput(
PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
context.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
}
});
registerTransformTranslator(
ParDo.BoundMulti.class,
new TransformTranslator<ParDo.BoundMulti>() {
@Override
public void translate(
ParDo.BoundMulti transform,
TranslationContext context) {
translateMultiHelper(transform, context);
}
private <InputT, OutputT> void translateMultiHelper(
ParDo.BoundMulti<InputT, OutputT> transform,
TranslationContext context) {
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
translateFn(transform.getFn(), context.getInput(transform).getWindowingStrategy(),
transform.getSideInputs(), context.getInput(transform).getCoder(), context);
translateOutputs(context.getOutput(transform), context);
}
});
registerTransformTranslator(
ParDo.Bound.class,
new TransformTranslator<ParDo.Bound>() {
@Override
public void translate(
ParDo.Bound transform,
TranslationContext context) {
translateSingleHelper(transform, context);
}
private <InputT, OutputT> void translateSingleHelper(
ParDo.Bound<InputT, OutputT> transform,
TranslationContext context) {
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
translateFn(
transform.getFn(),
context.getInput(transform).getWindowingStrategy(),
transform.getSideInputs(), context.getInput(transform).getCoder(), context);
context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
}
});
registerTransformTranslator(
Window.Bound.class,
new DataflowPipelineTranslator.TransformTranslator<Window.Bound>() {
@Override
public void translate(
Window.Bound transform, TranslationContext context) {
translateHelper(transform, context);
}
private <T> void translateHelper(
Window.Bound<T> transform, TranslationContext context) {
context.addStep(transform, "Bucket");
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
byte[] serializedBytes = serializeToByteArray(strategy);
String serializedJson = byteArrayToJsonString(serializedBytes);
assert Arrays.equals(serializedBytes,
jsonStringToByteArray(serializedJson));
context.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
}
});
///////////////////////////////////////////////////////////////////////////
// IO Translation.
registerTransformTranslator(
AvroIO.Read.Bound.class, new AvroIOTranslator.ReadTranslator());
registerTransformTranslator(
AvroIO.Write.Bound.class, new AvroIOTranslator.WriteTranslator());
registerTransformTranslator(
BigQueryIO.Read.Bound.class, new BigQueryIOTranslator.ReadTranslator());
registerTransformTranslator(
BigQueryIO.Write.Bound.class, new BigQueryIOTranslator.WriteTranslator());
registerTransformTranslator(
PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator());
registerTransformTranslator(
DataflowPipelineRunner.StreamingPubsubIOWrite.class,
new PubsubIOTranslator.WriteTranslator());
registerTransformTranslator(
TextIO.Read.Bound.class, new TextIOTranslator.ReadTranslator());
registerTransformTranslator(
TextIO.Write.Bound.class, new TextIOTranslator.WriteTranslator());
registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
}