def run_pipeline()

in sdks/python/apache_beam/runners/dataflow/dataflow_runner.py [0:0]


  def run_pipeline(self, pipeline, options):
    """Remotely executes entire pipeline or parts reachable from node."""
    # Label goog-dataflow-notebook if job is started from notebook.
    if is_in_notebook():
      notebook_version = (
          'goog-dataflow-notebook=' +
          beam.version.__version__.replace('.', '_'))
      if options.view_as(GoogleCloudOptions).labels:
        options.view_as(GoogleCloudOptions).labels.append(notebook_version)
      else:
        options.view_as(GoogleCloudOptions).labels = [notebook_version]

    # Import here to avoid adding the dependency for local running scenarios.
    try:
      # pylint: disable=wrong-import-order, wrong-import-position
      from apache_beam.runners.dataflow.internal import apiclient
    except ImportError:
      raise ImportError(
          'Google Cloud Dataflow runner not available, '
          'please install apache_beam[gcp]')

    self._maybe_add_unified_worker_missing_options(options)

    use_fnapi = apiclient._use_fnapi(options)

    if not use_fnapi:
      self._check_for_unsupported_features_on_non_portable_worker(pipeline)

    # Convert all side inputs into a form acceptable to Dataflow.
    pipeline.visit(
        self.side_input_visitor(
            apiclient._use_unified_worker(options),
            apiclient._use_fnapi(options),
            deterministic_key_coders=not options.view_as(
                TypeOptions).allow_non_deterministic_key_coders))

    # Performing configured PTransform overrides.  Note that this is currently
    # done before Runner API serialization, since the new proto needs to contain
    # any added PTransforms.
    pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)

    from apache_beam.runners.dataflow.ptransform_overrides import WriteToBigQueryPTransformOverride
    from apache_beam.runners.dataflow.ptransform_overrides import GroupIntoBatchesWithShardedKeyPTransformOverride
    pipeline.replace_all([
        WriteToBigQueryPTransformOverride(pipeline, options),
        GroupIntoBatchesWithShardedKeyPTransformOverride(self, options)
    ])

    if use_fnapi and not apiclient._use_unified_worker(options):
      pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)

    from apache_beam.transforms import environments
    if options.view_as(SetupOptions).prebuild_sdk_container_engine:
      # if prebuild_sdk_container_engine is specified we will build a new sdk
      # container image with dependencies pre-installed and use that image,
      # instead of using the inferred default container image.
      self._default_environment = (
          environments.DockerEnvironment.from_options(options))
      options.view_as(WorkerOptions).sdk_container_image = (
          self._default_environment.container_image)
    else:
      self._default_environment = (
          environments.DockerEnvironment.from_container_image(
              apiclient.get_container_image_from_options(options),
              artifacts=environments.python_sdk_dependencies(options),
              resource_hints=environments.resource_hints_from_options(options)))

    # This has to be performed before pipeline proto is constructed to make sure
    # that the changes are reflected in the portable job submission path.
    self._adjust_pipeline_for_dataflow_v2(pipeline)

    # Snapshot the pipeline in a portable proto.
    self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
        return_context=True, default_environment=self._default_environment)

    # Optimize the pipeline if it not streaming and the pre_optimize
    # experiment is set.
    if not options.view_as(StandardOptions).streaming:
      pre_optimize = options.view_as(DebugOptions).lookup_experiment(
          'pre_optimize', 'default').lower()
      from apache_beam.runners.portability.fn_api_runner import translations
      if pre_optimize == 'none':
        phases = []
      elif pre_optimize == 'default' or pre_optimize == 'all':
        phases = [translations.pack_combiners, translations.sort_stages]
      else:
        phases = []
        for phase_name in pre_optimize.split(','):
          # For now, these are all we allow.
          if phase_name in ('pack_combiners', ):
            phases.append(getattr(translations, phase_name))
          else:
            raise ValueError(
                'Unknown or inapplicable phase for pre_optimize: %s' %
                phase_name)
        phases.append(translations.sort_stages)

      if phases:
        self.proto_pipeline = translations.optimize_pipeline(
            self.proto_pipeline,
            phases=phases,
            known_runner_urns=frozenset(),
            partial=True)

    if not use_fnapi:
      # Performing configured PTransform overrides which should not be reflected
      # in the proto representation of the graph.
      pipeline.replace_all(DataflowRunner._NON_PORTABLE_PTRANSFORM_OVERRIDES)

    # Add setup_options for all the BeamPlugin imports
    setup_options = options.view_as(SetupOptions)
    plugins = BeamPlugin.get_all_plugin_paths()
    if setup_options.beam_plugins is not None:
      plugins = list(set(plugins + setup_options.beam_plugins))
    setup_options.beam_plugins = plugins

    # Elevate "min_cpu_platform" to pipeline option, but using the existing
    # experiment.
    debug_options = options.view_as(DebugOptions)
    worker_options = options.view_as(WorkerOptions)
    if worker_options.min_cpu_platform:
      debug_options.add_experiment(
          'min_cpu_platform=' + worker_options.min_cpu_platform)

    if (apiclient._use_unified_worker(options) and
        pipeline.contains_external_transforms):
      # All Dataflow multi-language pipelines (supported by Runner v2 only) use
      # portable job submission by default.
      debug_options.add_experiment("use_portable_job_submission")

    # Elevate "enable_streaming_engine" to pipeline option, but using the
    # existing experiment.
    google_cloud_options = options.view_as(GoogleCloudOptions)
    if google_cloud_options.enable_streaming_engine:
      debug_options.add_experiment("enable_windmill_service")
      debug_options.add_experiment("enable_streaming_engine")
    elif (apiclient._use_fnapi(options) and
          apiclient._use_unified_worker(options) and
          options.view_as(StandardOptions).streaming):
      debug_options.add_experiment("enable_windmill_service")
      debug_options.add_experiment("enable_streaming_engine")
    else:
      if (debug_options.lookup_experiment("enable_windmill_service") or
          debug_options.lookup_experiment("enable_streaming_engine")):
        raise ValueError(