private List getOverrides()

in runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java [480:639]


  private List<PTransformOverride> getOverrides(boolean streaming) {
    ImmutableList.Builder<PTransformOverride> overridesBuilder = ImmutableList.builder();

    // Create is implemented in terms of a Read, so it must precede the override to Read in
    // streaming
    overridesBuilder
        .add(
            PTransformOverride.of(
                PTransformMatchers.flattenWithDuplicateInputs(),
                DeduplicatedFlattenFactory.create()))
        .add(
            PTransformOverride.of(
                PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance()));

    // By default Dataflow runner replaces single-output ParDo with a ParDoSingle override.
    // However, we want a different expansion for single-output splittable ParDo.
    overridesBuilder
        .add(
            PTransformOverride.of(
                PTransformMatchers.splittableParDoSingle(),
                new ReflectiveOneToOneOverrideFactory(
                    SplittableParDoOverrides.ParDoSingleViaMulti.class, this)))
        .add(
            PTransformOverride.of(
                PTransformMatchers.splittableParDoMulti(),
                new SplittableParDoOverrides.SplittableParDoOverrideFactory()));

    if (streaming) {
      if (!hasExperiment(options, "enable_custom_pubsub_source")) {
        overridesBuilder.add(
            PTransformOverride.of(
                PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
                new StreamingPubsubIOReadOverrideFactory()));
      }
      if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
        overridesBuilder.add(
            PTransformOverride.of(
                PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
                new StreamingPubsubIOWriteOverrideFactory(this)));
      }

      overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);

      overridesBuilder.add(
          PTransformOverride.of(
              PTransformMatchers.writeWithRunnerDeterminedSharding(),
              new StreamingShardedWriteFactory(options)));

      overridesBuilder.add(
          PTransformOverride.of(
              PTransformMatchers.groupWithShardableStates(),
              new GroupIntoBatchesOverride.StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(
                  this)));

      overridesBuilder
          .add(
              // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
              // must precede it
              PTransformOverride.of(
                  PTransformMatchers.classEqualTo(Read.Bounded.class),
                  new StreamingBoundedReadOverrideFactory()))
          .add(
              PTransformOverride.of(
                  PTransformMatchers.classEqualTo(Read.Unbounded.class),
                  new StreamingUnboundedReadOverrideFactory()));

      overridesBuilder.add(
          PTransformOverride.of(
              PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
              new StreamingCreatePCollectionViewFactory()));

      // Dataflow Streaming runner overrides the SPLITTABLE_PROCESS_KEYED transform
      // natively in the Dataflow service.
    } else {
      overridesBuilder.add(SplittableParDo.PRIMITIVE_BOUNDED_READ_OVERRIDE);
      overridesBuilder
          // Replace GroupIntoBatches before the state/timer replacements below since
          // GroupIntoBatches internally uses a stateful DoFn.
          .add(
              PTransformOverride.of(
                  PTransformMatchers.classEqualTo(GroupIntoBatches.class),
                  new GroupIntoBatchesOverride.BatchGroupIntoBatchesOverrideFactory<>()))
          .add(
              PTransformOverride.of(
                  PTransformMatchers.classEqualTo(GroupIntoBatches.WithShardedKey.class),
                  new GroupIntoBatchesOverride
                      .BatchGroupIntoBatchesWithShardedKeyOverrideFactory<>()));

      overridesBuilder
          // State and timer pardos are implemented by expansion to GBK-then-ParDo
          .add(
              PTransformOverride.of(
                  PTransformMatchers.stateOrTimerParDoMulti(),
                  BatchStatefulParDoOverrides.multiOutputOverrideFactory(options)))
          .add(
              PTransformOverride.of(
                  PTransformMatchers.stateOrTimerParDoSingle(),
                  BatchStatefulParDoOverrides.singleOutputOverrideFactory()));
      // Dataflow Batch runner uses the naive override of the SPLITTABLE_PROCESS_KEYED transform
      // for now, but eventually (when liquid sharding is implemented) will also override it
      // natively in the Dataflow service.
      overridesBuilder.add(
          PTransformOverride.of(
              PTransformMatchers.splittableProcessKeyedBounded(),
              new SplittableParDoNaiveBounded.OverrideFactory()));

      overridesBuilder
          .add(
              PTransformOverride.of(
                  PTransformMatchers.classEqualTo(View.AsMap.class),
                  new ReflectiveViewOverrideFactory(BatchViewOverrides.BatchViewAsMap.class, this)))
          .add(
              PTransformOverride.of(
                  PTransformMatchers.classEqualTo(View.AsMultimap.class),
                  new ReflectiveViewOverrideFactory(
                      BatchViewOverrides.BatchViewAsMultimap.class, this)))
          .add(
              PTransformOverride.of(
                  PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
                  new CombineGloballyAsSingletonViewOverrideFactory(this)))
          .add(
              PTransformOverride.of(
                  PTransformMatchers.classEqualTo(View.AsList.class),
                  new ReflectiveViewOverrideFactory(
                      BatchViewOverrides.BatchViewAsList.class, this)))
          .add(
              PTransformOverride.of(
                  PTransformMatchers.classEqualTo(View.AsIterable.class),
                  new ReflectiveViewOverrideFactory(
                      BatchViewOverrides.BatchViewAsIterable.class, this)));
    }
    /* TODO(Beam-4684): Support @RequiresStableInput on Dataflow in a more intelligent way
    Use Reshuffle might cause an extra and unnecessary shuffle to be inserted. To enable this, we
    should make sure that we do not add extra shuffles for transforms whose input is already stable.
    // Uses Reshuffle, so has to be before the Reshuffle override
    overridesBuilder.add(
        PTransformOverride.of(
            PTransformMatchers.requiresStableInputParDoSingle(),
            RequiresStableInputParDoOverrides.singleOutputOverrideFactory()));
    // Uses Reshuffle, so has to be before the Reshuffle override
    overridesBuilder.add(
        PTransformOverride.of(
            PTransformMatchers.requiresStableInputParDoMulti(),
            RequiresStableInputParDoOverrides.multiOutputOverrideFactory()));
    */
    overridesBuilder
        .add(
            PTransformOverride.of(
                PTransformMatchers.classEqualTo(Reshuffle.class), new ReshuffleOverrideFactory()))
        // Order is important. Streaming views almost all use Combine internally.
        .add(
            PTransformOverride.of(
                new DataflowPTransformMatchers.CombineValuesWithoutSideInputsPTransformMatcher(),
                new PrimitiveCombineGroupedValuesOverrideFactory()))
        .add(
            PTransformOverride.of(
                PTransformMatchers.classEqualTo(ParDo.SingleOutput.class),
                new PrimitiveParDoSingleFactory()));
    return overridesBuilder.build();
  }