sdks/python/apache_beam/runners/portability/portable_runner.py (457 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
# mypy: check-untyped-defs
import atexit
import functools
import itertools
import logging
import threading
import time
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import Iterator
from typing import Optional
from typing import Tuple
import grpc
from apache_beam.metrics import metric
from apache_beam.metrics.execution import MetricResult
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import ValueProvider
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.runners import runner
from apache_beam.runners.job import utils as job_utils
from apache_beam.runners.portability import artifact_service
from apache_beam.runners.portability import job_server
from apache_beam.runners.portability import portable_metrics
from apache_beam.runners.portability.fn_api_runner.fn_runner import translations
from apache_beam.runners.worker import sdk_worker_main
from apache_beam.runners.worker import worker_pool_main
from apache_beam.transforms import environments
if TYPE_CHECKING:
from google.protobuf import struct_pb2 # pylint: disable=ungrouped-imports
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline
from apache_beam.portability.api import beam_runner_api_pb2
__all__ = ['PortableRunner']
MESSAGE_LOG_LEVELS = {
beam_job_api_pb2.JobMessage.MESSAGE_IMPORTANCE_UNSPECIFIED: logging.INFO,
beam_job_api_pb2.JobMessage.JOB_MESSAGE_DEBUG: logging.DEBUG,
beam_job_api_pb2.JobMessage.JOB_MESSAGE_DETAILED: logging.DEBUG,
beam_job_api_pb2.JobMessage.JOB_MESSAGE_BASIC: logging.INFO,
beam_job_api_pb2.JobMessage.JOB_MESSAGE_WARNING: logging.WARNING,
beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR: logging.ERROR,
}
TERMINAL_STATES = [
beam_job_api_pb2.JobState.DONE,
beam_job_api_pb2.JobState.DRAINED,
beam_job_api_pb2.JobState.FAILED,
beam_job_api_pb2.JobState.CANCELLED,
]
ENV_TYPE_ALIASES = {'LOOPBACK': 'EXTERNAL'}
_LOGGER = logging.getLogger(__name__)
class JobServiceHandle(object):
"""
Encapsulates the interactions necessary to submit a pipeline to a job service.
The base set of interactions consists of 3 steps:
- prepare
- stage
- run
"""
def __init__(self, job_service, options, retain_unknown_options=False):
self.job_service = job_service
self.options = options
self.timeout = options.view_as(PortableOptions).job_server_timeout
self.artifact_endpoint = options.view_as(PortableOptions).artifact_endpoint
self._retain_unknown_options = retain_unknown_options
def submit(self, proto_pipeline):
# type: (beam_runner_api_pb2.Pipeline) -> Tuple[str, Iterator[beam_job_api_pb2.JobStateEvent], Iterator[beam_job_api_pb2.JobMessagesResponse]]
"""
Submit and run the pipeline defined by `proto_pipeline`.
"""
prepare_response = self.prepare(proto_pipeline)
artifact_endpoint = (
self.artifact_endpoint or
prepare_response.artifact_staging_endpoint.url)
self.stage(
proto_pipeline,
artifact_endpoint,
prepare_response.staging_session_token)
return self.run(prepare_response.preparation_id)
def get_pipeline_options(self):
# type: () -> struct_pb2.Struct
"""
Get `self.options` as a protobuf Struct
"""
# fetch runner options from job service
# retries in case the channel is not ready
def send_options_request(max_retries=5):
num_retries = 0
while True:
try:
# This reports channel is READY but connections may fail
# Seems to be only an issue on Mac with port forwardings
return self.job_service.DescribePipelineOptions(
beam_job_api_pb2.DescribePipelineOptionsRequest(),
timeout=self.timeout)
except grpc.FutureTimeoutError:
# no retry for timeout errors
raise
except grpc.RpcError as e:
num_retries += 1
if num_retries > max_retries:
raise e
time.sleep(1)
options_response = send_options_request()
def add_runner_options(parser):
for option in options_response.options:
try:
# no default values - we don't want runner options
# added unless they were specified by the user
add_arg_args = {'action': 'store', 'help': option.description}
if option.type == beam_job_api_pb2.PipelineOptionType.BOOLEAN:
add_arg_args['action'] = 'store_true' \
if option.default_value != 'true' else 'store_false'
elif option.type == beam_job_api_pb2.PipelineOptionType.INTEGER:
add_arg_args['type'] = int
elif option.type == beam_job_api_pb2.PipelineOptionType.ARRAY:
add_arg_args['action'] = 'append'
parser.add_argument("--%s" % option.name, **add_arg_args)
except Exception as e:
# ignore runner options that are already present
# only in this case is duplicate not treated as error
if 'conflicting option string' not in str(e):
raise
_LOGGER.debug("Runner option '%s' was already added" % option.name)
all_options = self.options.get_all_options(
add_extra_args_fn=add_runner_options,
retain_unknown_options=self._retain_unknown_options)
return self.encode_pipeline_options(all_options)
@staticmethod
def encode_pipeline_options(
all_options: Dict[str, Any]) -> 'struct_pb2.Struct':
def convert_pipeline_option_value(v):
# convert int values: BEAM-5509
if type(v) == int:
return str(v)
elif isinstance(v, ValueProvider):
return convert_pipeline_option_value(
v.get()) if v.is_accessible() else None
return v
# TODO: Define URNs for options.
p_options = {
'beam:option:' + k + ':v1': convert_pipeline_option_value(v)
for k,
v in all_options.items() if v is not None
}
return job_utils.dict_to_struct(p_options)
def prepare(self, proto_pipeline):
# type: (beam_runner_api_pb2.Pipeline) -> beam_job_api_pb2.PrepareJobResponse
"""Prepare the job on the job service"""
return self.job_service.Prepare(
beam_job_api_pb2.PrepareJobRequest(
job_name='job',
pipeline=proto_pipeline,
pipeline_options=self.get_pipeline_options()),
timeout=self.timeout)
def stage(self,
proto_pipeline, # type: beam_runner_api_pb2.Pipeline
artifact_staging_endpoint,
staging_session_token
):
# type: (...) -> None
"""Stage artifacts"""
if artifact_staging_endpoint:
artifact_service.offer_artifacts(
beam_artifact_api_pb2_grpc.ArtifactStagingServiceStub(
channel=grpc.insecure_channel(artifact_staging_endpoint)),
artifact_service.ArtifactRetrievalService(
artifact_service.BeamFilesystemHandler(None).file_reader),
staging_session_token)
def run(self, preparation_id):
# type: (str) -> Tuple[str, Iterator[beam_job_api_pb2.JobStateEvent], Iterator[beam_job_api_pb2.JobMessagesResponse]]
"""Run the job"""
try:
state_stream = self.job_service.GetStateStream(
beam_job_api_pb2.GetJobStateRequest(job_id=preparation_id),
timeout=self.timeout)
# If there's an error, we don't always get it until we try to read.
# Fortunately, there's always an immediate current state published.
state_stream = itertools.chain([next(state_stream)], state_stream)
message_stream = self.job_service.GetMessageStream(
beam_job_api_pb2.JobMessagesRequest(job_id=preparation_id),
timeout=self.timeout)
except Exception:
# TODO(BEAM-6442): Unify preparation_id and job_id for all runners.
state_stream = message_stream = None
# Run the job and wait for a result, we don't set a timeout here because
# it may take a long time for a job to complete and streaming
# jobs currently never return a response.
run_response = self.job_service.Run(
beam_job_api_pb2.RunJobRequest(preparation_id=preparation_id))
if state_stream is None:
state_stream = self.job_service.GetStateStream(
beam_job_api_pb2.GetJobStateRequest(job_id=run_response.job_id))
message_stream = self.job_service.GetMessageStream(
beam_job_api_pb2.JobMessagesRequest(job_id=run_response.job_id))
return run_response.job_id, message_stream, state_stream
class PortableRunner(runner.PipelineRunner):
"""
Experimental: No backward compatibility guaranteed.
A BeamRunner that executes Python pipelines via the Beam Job API.
This runner is a stub and does not run the actual job.
This runner schedules the job on a job service. The responsibility of
running and managing the job lies with the job service used.
"""
def __init__(self):
self._dockerized_job_server = None # type: Optional[job_server.JobServer]
@staticmethod
def _create_environment(options):
# type: (PipelineOptions) -> environments.Environment
portable_options = options.view_as(PortableOptions)
# Do not set a Runner. Otherwise this can cause problems in Java's
# PipelineOptions, i.e. ClassNotFoundException, if the corresponding Runner
# does not exist in the Java SDK. In portability, the entry point is clearly
# defined via the JobService.
portable_options.view_as(StandardOptions).runner = None
environment_type = portable_options.environment_type
if not environment_type:
environment_urn = common_urns.environments.DOCKER.urn
elif environment_type.startswith('beam:env:'):
environment_urn = environment_type
else:
# e.g. handle LOOPBACK -> EXTERNAL
environment_type = ENV_TYPE_ALIASES.get(
environment_type, environment_type)
try:
environment_urn = getattr(
common_urns.environments, environment_type).urn
except AttributeError:
raise ValueError('Unknown environment type: %s' % environment_type)
env_class = environments.Environment.get_env_cls_from_urn(environment_urn)
return env_class.from_options(portable_options)
def default_job_server(self, options):
raise NotImplementedError(
'You must specify a --job_endpoint when using --runner=PortableRunner. '
'Alternatively, you may specify which portable runner you intend to '
'use, such as --runner=FlinkRunner or --runner=SparkRunner.')
def create_job_service_handle(self, job_service, options):
# type: (...) -> JobServiceHandle
return JobServiceHandle(job_service, options)
def create_job_service(self, options):
# type: (PipelineOptions) -> JobServiceHandle
"""
Start the job service and return a `JobServiceHandle`
"""
job_endpoint = options.view_as(PortableOptions).job_endpoint
if job_endpoint:
if job_endpoint == 'embed':
server = job_server.EmbeddedJobServer() # type: job_server.JobServer
else:
job_server_timeout = options.view_as(PortableOptions).job_server_timeout
server = job_server.ExternalJobServer(job_endpoint, job_server_timeout)
else:
server = self.default_job_server(options)
return self.create_job_service_handle(server.start(), options)
@staticmethod
def get_proto_pipeline(pipeline, options):
# type: (Pipeline, PipelineOptions) -> beam_runner_api_pb2.Pipeline
portable_options = options.view_as(PortableOptions)
proto_pipeline = pipeline.to_runner_api(
default_environment=PortableRunner._create_environment(
portable_options))
# TODO: https://issues.apache.org/jira/browse/BEAM-7199
# Eventually remove the 'pre_optimize' option alltogether and only perform
# the equivalent of the 'default' case below (minus the 'lift_combiners'
# part).
pre_optimize = options.view_as(DebugOptions).lookup_experiment(
'pre_optimize', 'default').lower()
if (not options.view_as(StandardOptions).streaming and
pre_optimize != 'none'):
if pre_optimize == 'default':
phases = [
# TODO: https://issues.apache.org/jira/browse/BEAM-4678
# https://issues.apache.org/jira/browse/BEAM-11478
# Eventually remove the 'lift_combiners' phase from 'default'.
translations.lift_combiners,
translations.sort_stages
]
partial = True
elif pre_optimize == 'all':
phases = [
translations.annotate_downstream_side_inputs,
translations.annotate_stateful_dofns_as_roots,
translations.fix_side_input_pcoll_coders,
translations.pack_combiners,
translations.lift_combiners,
translations.expand_sdf,
translations.fix_flatten_coders,
# translations.sink_flattens,
translations.greedily_fuse,
translations.read_to_impulse,
translations.extract_impulse_stages,
translations.remove_data_plane_ops,
translations.sort_stages
]
partial = False
elif pre_optimize == 'all_except_fusion':
# TODO(BEAM-7248): Delete this branch after PortableRunner supports
# beam:runner:executable_stage:v1.
phases = [
translations.annotate_downstream_side_inputs,
translations.annotate_stateful_dofns_as_roots,
translations.fix_side_input_pcoll_coders,
translations.pack_combiners,
translations.lift_combiners,
translations.expand_sdf,
translations.fix_flatten_coders,
# translations.sink_flattens,
# translations.greedily_fuse,
translations.read_to_impulse,
translations.extract_impulse_stages,
translations.remove_data_plane_ops,
translations.sort_stages
]
partial = True
else:
phases = []
for phase_name in pre_optimize.split(','):
# For now, these are all we allow.
if phase_name in ('pack_combiners', 'lift_combiners'):
phases.append(getattr(translations, phase_name))
else:
raise ValueError(
'Unknown or inapplicable phase for pre_optimize: %s' %
phase_name)
phases.append(translations.sort_stages)
partial = True
# All (known) portable runners (ie Flink and Spark) support these URNs.
known_urns = frozenset([
common_urns.composites.RESHUFFLE.urn,
common_urns.primitives.IMPULSE.urn,
common_urns.primitives.FLATTEN.urn,
common_urns.primitives.GROUP_BY_KEY.urn
])
proto_pipeline = translations.optimize_pipeline(
proto_pipeline,
phases=phases,
known_runner_urns=known_urns,
partial=partial)
return proto_pipeline
def run_pipeline(self, pipeline, options):
# type: (Pipeline, PipelineOptions) -> PipelineResult
portable_options = options.view_as(PortableOptions)
# TODO: https://issues.apache.org/jira/browse/BEAM-5525
# portable runner specific default
if options.view_as(SetupOptions).sdk_location == 'default':
options.view_as(SetupOptions).sdk_location = 'container'
experiments = options.view_as(DebugOptions).experiments or []
# This is needed as we start a worker server if one is requested
# but none is provided.
if portable_options.environment_type == 'LOOPBACK':
use_loopback_process_worker = options.view_as(
DebugOptions).lookup_experiment('use_loopback_process_worker', False)
portable_options.environment_config, server = (
worker_pool_main.BeamFnExternalWorkerPoolServicer.start(
state_cache_size=
sdk_worker_main._get_state_cache_size(experiments),
data_buffer_time_limit_ms=
sdk_worker_main._get_data_buffer_time_limit_ms(experiments),
use_process=use_loopback_process_worker))
cleanup_callbacks = [functools.partial(server.stop, 1)]
else:
cleanup_callbacks = []
proto_pipeline = self.get_proto_pipeline(pipeline, options)
job_service_handle = self.create_job_service(options)
job_id, message_stream, state_stream = \
job_service_handle.submit(proto_pipeline)
result = PipelineResult(
job_service_handle.job_service,
job_id,
message_stream,
state_stream,
cleanup_callbacks)
if cleanup_callbacks:
# Register an exit handler to ensure cleanup on exit.
atexit.register(functools.partial(result._cleanup, on_exit=True))
_LOGGER.info(
'Environment "%s" has started a component necessary for the '
'execution. Be sure to run the pipeline using\n'
' with Pipeline() as p:\n'
' p.apply(..)\n'
'This ensures that the pipeline finishes before this program exits.',
portable_options.environment_type)
return result
class PortableMetrics(metric.MetricResults):
def __init__(self, job_metrics_response):
metrics = job_metrics_response.metrics
self.attempted = portable_metrics.from_monitoring_infos(metrics.attempted)
self.committed = portable_metrics.from_monitoring_infos(metrics.committed)
@staticmethod
def _combine(committed, attempted, filter):
all_keys = set(committed.keys()) | set(attempted.keys())
return [
MetricResult(key, committed.get(key), attempted.get(key))
for key in all_keys if metric.MetricResults.matches(filter, key)
]
def query(self, filter=None):
counters, distributions, gauges = [
self._combine(x, y, filter)
for x, y in zip(self.committed, self.attempted)
]
return {
self.COUNTERS: counters,
self.DISTRIBUTIONS: distributions,
self.GAUGES: gauges
}
class PipelineResult(runner.PipelineResult):
def __init__(
self,
job_service,
job_id,
message_stream,
state_stream,
cleanup_callbacks=()):
super(PipelineResult, self).__init__(beam_job_api_pb2.JobState.UNSPECIFIED)
self._job_service = job_service
self._job_id = job_id
self._messages = []
self._message_stream = message_stream
self._state_stream = state_stream
self._cleanup_callbacks = cleanup_callbacks
self._metrics = None
self._runtime_exception = None
def cancel(self):
# type: () -> None
try:
self._job_service.Cancel(
beam_job_api_pb2.CancelJobRequest(job_id=self._job_id))
finally:
self._cleanup()
@property
def state(self):
runner_api_state = self._job_service.GetState(
beam_job_api_pb2.GetJobStateRequest(job_id=self._job_id)).state
self._state = self._runner_api_state_to_pipeline_state(runner_api_state)
return self._state
@staticmethod
def _runner_api_state_to_pipeline_state(runner_api_state):
return getattr(
runner.PipelineState,
beam_job_api_pb2.JobState.Enum.Name(runner_api_state))
@staticmethod
def _pipeline_state_to_runner_api_state(pipeline_state):
return beam_job_api_pb2.JobState.Enum.Value(pipeline_state)
def metrics(self):
if not self._metrics:
job_metrics_response = self._job_service.GetJobMetrics(
beam_job_api_pb2.GetJobMetricsRequest(job_id=self._job_id))
self._metrics = PortableMetrics(job_metrics_response)
return self._metrics
def _last_error_message(self):
# type: () -> str
# Filter only messages with the "message_response" and error messages.
messages = [
m.message_response for m in self._messages
if m.HasField('message_response')
]
error_messages = [
m for m in messages
if m.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR
]
if error_messages:
return error_messages[-1].message_text
else:
return 'unknown error'
def wait_until_finish(self, duration=None):
"""
:param duration: The maximum time in milliseconds to wait for the result of
the execution. If None or zero, will wait until the pipeline finishes.
:return: The result of the pipeline, i.e. PipelineResult.
"""
def read_messages():
# type: () -> None
previous_state = -1
for message in self._message_stream:
if message.HasField('message_response'):
logging.log(
MESSAGE_LOG_LEVELS[message.message_response.importance],
"%s",
message.message_response.message_text)
else:
current_state = message.state_response.state
if current_state != previous_state:
_LOGGER.info(
"Job state changed to %s",
self._runner_api_state_to_pipeline_state(current_state))
previous_state = current_state
self._messages.append(message)
message_thread = threading.Thread(
target=read_messages, name='wait_until_finish_read')
message_thread.daemon = True
message_thread.start()
if duration:
state_thread = threading.Thread(
target=functools.partial(self._observe_state, message_thread),
name='wait_until_finish_state_observer')
state_thread.daemon = True
state_thread.start()
start_time = time.time()
duration_secs = duration / 1000
while (time.time() - start_time < duration_secs and
state_thread.is_alive()):
time.sleep(1)
else:
self._observe_state(message_thread)
if self._runtime_exception:
raise self._runtime_exception
return self._state
def _observe_state(self, message_thread):
try:
for state_response in self._state_stream:
self._state = self._runner_api_state_to_pipeline_state(
state_response.state)
if state_response.state in TERMINAL_STATES:
# Wait for any last messages.
message_thread.join(10)
break
if self._state != runner.PipelineState.DONE:
self._runtime_exception = RuntimeError(
'Pipeline %s failed in state %s: %s' %
(self._job_id, self._state, self._last_error_message()))
except Exception as e:
self._runtime_exception = e
finally:
self._cleanup()
def _cleanup(self, on_exit=False):
# type: (bool) -> None
if on_exit and self._cleanup_callbacks:
_LOGGER.info(
'Running cleanup on exit. If your pipeline should continue running, '
'be sure to use the following syntax:\n'
' with Pipeline() as p:\n'
' p.apply(..)\n'
'This ensures that the pipeline finishes before this program exits.')
has_exception = None
for callback in self._cleanup_callbacks:
try:
callback()
except Exception:
has_exception = True
self._cleanup_callbacks = ()
if has_exception:
raise