static

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java [770:1013]


  static {
    registerTransformTranslator(
        View.CreatePCollectionView.class,
        new TransformTranslator<View.CreatePCollectionView>() {
          @Override
          public void translate(
              View.CreatePCollectionView transform,
              TranslationContext context) {
            translateTyped(transform, context);
          }

          private <ElemT, ViewT> void translateTyped(
              View.CreatePCollectionView<ElemT, ViewT> transform,
              TranslationContext context) {
            context.addStep(transform, "CollectionToSingleton");
            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
            context.addCollectionToSingletonOutput(
                PropertyNames.OUTPUT,
                context.getInput(transform),
                context.getOutput(transform));
          }
        });

    DataflowPipelineTranslator.registerTransformTranslator(
        Combine.GroupedValues.class,
        new DataflowPipelineTranslator.TransformTranslator<Combine.GroupedValues>() {
          @Override
          public void translate(
              Combine.GroupedValues transform,
              DataflowPipelineTranslator.TranslationContext context) {
            translateHelper(transform, context);
          }

          private <K, InputT, OutputT> void translateHelper(
              final Combine.GroupedValues<K, InputT, OutputT> transform,
              DataflowPipelineTranslator.TranslationContext context) {
            context.addStep(transform, "CombineValues");
            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));

            AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
                transform.getAppliedFn(
                    context.getInput(transform).getPipeline().getCoderRegistry(),
                    context.getInput(transform).getCoder());

            context.addEncodingInput(fn.getAccumulatorCoder());
            context.addInput(
                PropertyNames.SERIALIZED_FN,
                byteArrayToJsonString(serializeToByteArray(fn)));
            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
          }
        });

    registerTransformTranslator(
        Create.Values.class,
        new TransformTranslator<Create.Values>() {
          @Override
          public void translate(
              Create.Values transform,
              TranslationContext context) {
            createHelper(transform, context);
          }

          private <T> void createHelper(
              Create.Values<T> transform,
              TranslationContext context) {
            context.addStep(transform, "CreateCollection");

            Coder<T> coder = context.getOutput(transform).getCoder();
            List<CloudObject> elements = new LinkedList<>();
            for (T elem : transform.getElements()) {
              byte[] encodedBytes;
              try {
                encodedBytes = encodeToByteArray(coder, elem);
              } catch (CoderException exn) {
                // TODO: Put in better element printing:
                // truncate if too long.
                throw new IllegalArgumentException(
                    "Unable to encode element '" + elem + "' of transform '" + transform
                    + "' using coder '" + coder + "'.",
                    exn);
              }
              String encodedJson = byteArrayToJsonString(encodedBytes);
              assert Arrays.equals(encodedBytes,
                                   jsonStringToByteArray(encodedJson));
              elements.add(CloudObject.forString(encodedJson));
            }
            context.addInput(PropertyNames.ELEMENT, elements);
            context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
          }
        });

    registerTransformTranslator(
        Flatten.FlattenPCollectionList.class,
        new TransformTranslator<Flatten.FlattenPCollectionList>() {
          @Override
          public void translate(
              Flatten.FlattenPCollectionList transform,
              TranslationContext context) {
            flattenHelper(transform, context);
          }

          private <T> void flattenHelper(
              Flatten.FlattenPCollectionList<T> transform,
              TranslationContext context) {
            context.addStep(transform, "Flatten");

            List<OutputReference> inputs = new LinkedList<>();
            for (PCollection<T> input : context.getInput(transform).getAll()) {
              inputs.add(context.asOutputReference(input));
            }
            context.addInput(PropertyNames.INPUTS, inputs);
            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
          }
        });

    registerTransformTranslator(
        GroupByKey.class,
        new TransformTranslator<GroupByKey>() {
          @Override
          public void translate(
              GroupByKey transform,
              TranslationContext context) {
            groupByKeyHelper(transform, context);
          }

          private <K, V> void groupByKeyHelper(
              GroupByKey<K, V> transform,
              TranslationContext context) {
            context.addStep(transform, "GroupByKey");
            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));

            WindowingStrategy<?, ?> windowingStrategy =
                context.getInput(transform).getWindowingStrategy();
            boolean isStreaming =
                context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
            boolean disallowCombinerLifting =
                !windowingStrategy.getWindowFn().isNonMerging()
                || (isStreaming && !transform.fewKeys())
                // TODO: Allow combiner lifting on the non-default trigger, as appropriate.
                || !(windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger);
            context.addInput(
                PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
            context.addInput(
                PropertyNames.SERIALIZED_FN,
                byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
          }
        });

    registerTransformTranslator(
        ParDo.BoundMulti.class,
        new TransformTranslator<ParDo.BoundMulti>() {
          @Override
          public void translate(
              ParDo.BoundMulti transform,
              TranslationContext context) {
            translateMultiHelper(transform, context);
          }

          private <InputT, OutputT> void translateMultiHelper(
              ParDo.BoundMulti<InputT, OutputT> transform,
              TranslationContext context) {
            context.addStep(transform, "ParallelDo");
            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
            translateFn(transform.getFn(), context.getInput(transform).getWindowingStrategy(),
                transform.getSideInputs(), context.getInput(transform).getCoder(), context);
            translateOutputs(context.getOutput(transform), context);
          }
        });

    registerTransformTranslator(
        ParDo.Bound.class,
        new TransformTranslator<ParDo.Bound>() {
          @Override
          public void translate(
              ParDo.Bound transform,
              TranslationContext context) {
            translateSingleHelper(transform, context);
          }

          private <InputT, OutputT> void translateSingleHelper(
              ParDo.Bound<InputT, OutputT> transform,
              TranslationContext context) {
            context.addStep(transform, "ParallelDo");
            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
            translateFn(
                transform.getFn(),
                context.getInput(transform).getWindowingStrategy(),
                transform.getSideInputs(), context.getInput(transform).getCoder(), context);
            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
          }
        });


    registerTransformTranslator(
        Window.Bound.class,
        new DataflowPipelineTranslator.TransformTranslator<Window.Bound>() {
          @Override
          public void translate(
              Window.Bound transform, TranslationContext context) {
            translateHelper(transform, context);
          }

          private <T> void translateHelper(
              Window.Bound<T> transform, TranslationContext context) {
            context.addStep(transform, "Bucket");
            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));

            WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
            byte[] serializedBytes = serializeToByteArray(strategy);
            String serializedJson = byteArrayToJsonString(serializedBytes);
            assert Arrays.equals(serializedBytes,
                                 jsonStringToByteArray(serializedJson));
            context.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
          }
        });

    ///////////////////////////////////////////////////////////////////////////
    // IO Translation.

    registerTransformTranslator(
        AvroIO.Read.Bound.class, new AvroIOTranslator.ReadTranslator());
    registerTransformTranslator(
        AvroIO.Write.Bound.class, new AvroIOTranslator.WriteTranslator());

    registerTransformTranslator(
        BigQueryIO.Read.Bound.class, new BigQueryIOTranslator.ReadTranslator());
    registerTransformTranslator(
        BigQueryIO.Write.Bound.class, new BigQueryIOTranslator.WriteTranslator());

    registerTransformTranslator(
        PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator());
    registerTransformTranslator(
        DataflowPipelineRunner.StreamingPubsubIOWrite.class,
        new PubsubIOTranslator.WriteTranslator());

    registerTransformTranslator(
        TextIO.Read.Bound.class, new TextIOTranslator.ReadTranslator());
    registerTransformTranslator(
        TextIO.Write.Bound.class, new TextIOTranslator.WriteTranslator());

    registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
  }