in sdks/python/apache_beam/runners/dataflow/dataflow_runner.py [0:0]
def run_pipeline(self, pipeline, options):
"""Remotely executes entire pipeline or parts reachable from node."""
# Label goog-dataflow-notebook if job is started from notebook.
if is_in_notebook():
notebook_version = (
'goog-dataflow-notebook=' +
beam.version.__version__.replace('.', '_'))
if options.view_as(GoogleCloudOptions).labels:
options.view_as(GoogleCloudOptions).labels.append(notebook_version)
else:
options.view_as(GoogleCloudOptions).labels = [notebook_version]
# Import here to avoid adding the dependency for local running scenarios.
try:
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.runners.dataflow.internal import apiclient
except ImportError:
raise ImportError(
'Google Cloud Dataflow runner not available, '
'please install apache_beam[gcp]')
self._maybe_add_unified_worker_missing_options(options)
use_fnapi = apiclient._use_fnapi(options)
if not use_fnapi:
self._check_for_unsupported_features_on_non_portable_worker(pipeline)
# Convert all side inputs into a form acceptable to Dataflow.
pipeline.visit(
self.side_input_visitor(
apiclient._use_unified_worker(options),
apiclient._use_fnapi(options),
deterministic_key_coders=not options.view_as(
TypeOptions).allow_non_deterministic_key_coders))
# Performing configured PTransform overrides. Note that this is currently
# done before Runner API serialization, since the new proto needs to contain
# any added PTransforms.
pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)
from apache_beam.runners.dataflow.ptransform_overrides import WriteToBigQueryPTransformOverride
from apache_beam.runners.dataflow.ptransform_overrides import GroupIntoBatchesWithShardedKeyPTransformOverride
pipeline.replace_all([
WriteToBigQueryPTransformOverride(pipeline, options),
GroupIntoBatchesWithShardedKeyPTransformOverride(self, options)
])
if use_fnapi and not apiclient._use_unified_worker(options):
pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
from apache_beam.transforms import environments
if options.view_as(SetupOptions).prebuild_sdk_container_engine:
# if prebuild_sdk_container_engine is specified we will build a new sdk
# container image with dependencies pre-installed and use that image,
# instead of using the inferred default container image.
self._default_environment = (
environments.DockerEnvironment.from_options(options))
options.view_as(WorkerOptions).sdk_container_image = (
self._default_environment.container_image)
else:
self._default_environment = (
environments.DockerEnvironment.from_container_image(
apiclient.get_container_image_from_options(options),
artifacts=environments.python_sdk_dependencies(options),
resource_hints=environments.resource_hints_from_options(options)))
# This has to be performed before pipeline proto is constructed to make sure
# that the changes are reflected in the portable job submission path.
self._adjust_pipeline_for_dataflow_v2(pipeline)
# Snapshot the pipeline in a portable proto.
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True, default_environment=self._default_environment)
# Optimize the pipeline if it not streaming and the pre_optimize
# experiment is set.
if not options.view_as(StandardOptions).streaming:
pre_optimize = options.view_as(DebugOptions).lookup_experiment(
'pre_optimize', 'default').lower()
from apache_beam.runners.portability.fn_api_runner import translations
if pre_optimize == 'none':
phases = []
elif pre_optimize == 'default' or pre_optimize == 'all':
phases = [translations.pack_combiners, translations.sort_stages]
else:
phases = []
for phase_name in pre_optimize.split(','):
# For now, these are all we allow.
if phase_name in ('pack_combiners', ):
phases.append(getattr(translations, phase_name))
else:
raise ValueError(
'Unknown or inapplicable phase for pre_optimize: %s' %
phase_name)
phases.append(translations.sort_stages)
if phases:
self.proto_pipeline = translations.optimize_pipeline(
self.proto_pipeline,
phases=phases,
known_runner_urns=frozenset(),
partial=True)
if not use_fnapi:
# Performing configured PTransform overrides which should not be reflected
# in the proto representation of the graph.
pipeline.replace_all(DataflowRunner._NON_PORTABLE_PTRANSFORM_OVERRIDES)
# Add setup_options for all the BeamPlugin imports
setup_options = options.view_as(SetupOptions)
plugins = BeamPlugin.get_all_plugin_paths()
if setup_options.beam_plugins is not None:
plugins = list(set(plugins + setup_options.beam_plugins))
setup_options.beam_plugins = plugins
# Elevate "min_cpu_platform" to pipeline option, but using the existing
# experiment.
debug_options = options.view_as(DebugOptions)
worker_options = options.view_as(WorkerOptions)
if worker_options.min_cpu_platform:
debug_options.add_experiment(
'min_cpu_platform=' + worker_options.min_cpu_platform)
if (apiclient._use_unified_worker(options) and
pipeline.contains_external_transforms):
# All Dataflow multi-language pipelines (supported by Runner v2 only) use
# portable job submission by default.
debug_options.add_experiment("use_portable_job_submission")
# Elevate "enable_streaming_engine" to pipeline option, but using the
# existing experiment.
google_cloud_options = options.view_as(GoogleCloudOptions)
if google_cloud_options.enable_streaming_engine:
debug_options.add_experiment("enable_windmill_service")
debug_options.add_experiment("enable_streaming_engine")
elif (apiclient._use_fnapi(options) and
apiclient._use_unified_worker(options) and
options.view_as(StandardOptions).streaming):
debug_options.add_experiment("enable_windmill_service")
debug_options.add_experiment("enable_streaming_engine")
else:
if (debug_options.lookup_experiment("enable_windmill_service") or
debug_options.lookup_experiment("enable_streaming_engine")):
raise ValueError(