tfx/orchestration/kubeflow/kubeflow_dag_runner.py (224 lines of code) (raw):
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TFX runner for Kubeflow."""
import collections
import copy
import os
from typing import Any, Callable, Dict, List, Optional, Type, cast, MutableMapping
from absl import logging
from kfp import compiler
from kfp import dsl
from kfp import gcp
from kubernetes import client as k8s_client
from tfx import version
from tfx.dsl.compiler import compiler as tfx_compiler
from tfx.dsl.components.base import base_component as tfx_base_component
from tfx.dsl.components.base import base_node
from tfx.orchestration import data_types
from tfx.orchestration import pipeline as tfx_pipeline
from tfx.orchestration import tfx_runner
from tfx.orchestration.config import pipeline_config
from tfx.orchestration.kubeflow import base_component
from tfx.orchestration.kubeflow import utils
from tfx.orchestration.kubeflow.proto import kubeflow_pb2
from tfx.orchestration.launcher import base_component_launcher
from tfx.orchestration.launcher import in_process_component_launcher
from tfx.orchestration.launcher import kubernetes_component_launcher
from tfx.proto.orchestration import pipeline_pb2
from tfx.utils import telemetry_utils
# OpFunc represents the type of a function that takes as input a
# dsl.ContainerOp and returns the same object. Common operations such as adding
# k8s secrets, mounting volumes, specifying the use of TPUs and so on can be
# specified as an OpFunc.
# See example usage here:
# https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/gcp.py
OpFunc = Callable[[dsl.ContainerOp], dsl.ContainerOp]
# Default secret name for GCP credentials. This secret is installed as part of
# a typical Kubeflow installation when the component is GKE.
_KUBEFLOW_GCP_SECRET_NAME = 'user-gcp-sa'
# Default TFX container image to use in KubeflowDagRunner.
DEFAULT_KUBEFLOW_TFX_IMAGE = 'tensorflow/tfx:%s' % (version.__version__,)
def _mount_config_map_op(config_map_name: str) -> OpFunc:
"""Mounts all key-value pairs found in the named Kubernetes ConfigMap.
All key-value pairs in the ConfigMap are mounted as environment variables.
Args:
config_map_name: The name of the ConfigMap resource.
Returns:
An OpFunc for mounting the ConfigMap.
"""
def mount_config_map(container_op: dsl.ContainerOp):
config_map_ref = k8s_client.V1ConfigMapEnvSource(
name=config_map_name, optional=True)
container_op.container.add_env_from(
k8s_client.V1EnvFromSource(config_map_ref=config_map_ref))
return mount_config_map
def _mount_secret_op(secret_name: str) -> OpFunc:
"""Mounts all key-value pairs found in the named Kubernetes Secret.
All key-value pairs in the Secret are mounted as environment variables.
Args:
secret_name: The name of the Secret resource.
Returns:
An OpFunc for mounting the Secret.
"""
def mount_secret(container_op: dsl.ContainerOp):
secret_ref = k8s_client.V1ConfigMapEnvSource(
name=secret_name, optional=True)
container_op.container.add_env_from(
k8s_client.V1EnvFromSource(secret_ref=secret_ref))
return mount_secret
def get_default_pipeline_operator_funcs(
use_gcp_sa: bool = False) -> List[OpFunc]:
"""Returns a default list of pipeline operator functions.
Args:
use_gcp_sa: If true, mount a GCP service account secret to each pod, with
the name _KUBEFLOW_GCP_SECRET_NAME.
Returns:
A list of functions with type OpFunc.
"""
# Enables authentication for GCP services if needed.
gcp_secret_op = gcp.use_gcp_secret(_KUBEFLOW_GCP_SECRET_NAME)
# Mounts configmap containing Metadata gRPC server configuration.
mount_config_map_op = _mount_config_map_op('metadata-grpc-configmap')
if use_gcp_sa:
return [gcp_secret_op, mount_config_map_op]
else:
return [mount_config_map_op]
def get_default_kubeflow_metadata_config(
) -> kubeflow_pb2.KubeflowMetadataConfig:
"""Returns the default metadata connection config for Kubeflow.
Returns:
A config proto that will be serialized as JSON and passed to the running
container so the TFX component driver is able to communicate with MLMD in
a Kubeflow cluster.
"""
# The default metadata configuration for a Kubeflow Pipelines cluster is
# codified as a Kubernetes ConfigMap
# https://github.com/kubeflow/pipelines/blob/master/manifests/kustomize/base/metadata/metadata-grpc-configmap.yaml
config = kubeflow_pb2.KubeflowMetadataConfig()
# The environment variable to use to obtain the Metadata gRPC service host in
# the cluster that is backing Kubeflow Metadata. Note that the key in the
# config map and therefore environment variable used, are lower-cased.
config.grpc_config.grpc_service_host.environment_variable = 'METADATA_GRPC_SERVICE_HOST'
# The environment variable to use to obtain the Metadata grpc service port in
# the cluster that is backing Kubeflow Metadata.
config.grpc_config.grpc_service_port.environment_variable = 'METADATA_GRPC_SERVICE_PORT'
return config
def get_default_pod_labels() -> Dict[str, str]:
"""Returns the default pod label dict for Kubeflow."""
# KFP default transformers add pod env:
# https://github.com/kubeflow/pipelines/blob/0.1.32/sdk/python/kfp/compiler/_default_transformers.py
result = {
'add-pod-env': 'true',
telemetry_utils.LABEL_KFP_SDK_ENV: 'tfx'
}
return result
def get_default_output_filename(pipeline_name: str) -> str:
return pipeline_name + '.tar.gz'
class KubeflowDagRunnerConfig(pipeline_config.PipelineConfig):
"""Runtime configuration parameters specific to execution on Kubeflow."""
def __init__(
self,
pipeline_operator_funcs: Optional[List[OpFunc]] = None,
tfx_image: Optional[str] = None,
kubeflow_metadata_config: Optional[
kubeflow_pb2.KubeflowMetadataConfig] = None,
# TODO(b/143883035): Figure out the best practice to put the
# SUPPORTED_LAUNCHER_CLASSES
supported_launcher_classes: Optional[List[Type[
base_component_launcher.BaseComponentLauncher]]] = None,
metadata_ui_path: str = '/mlpipeline-ui-metadata.json',
**kwargs):
"""Creates a KubeflowDagRunnerConfig object.
The user can use pipeline_operator_funcs to apply modifications to
ContainerOps used in the pipeline. For example, to ensure the pipeline
steps mount a GCP secret, and a Persistent Volume, one can create config
object like so:
from kfp import gcp, onprem
mount_secret_op = gcp.use_secret('my-secret-name)
mount_volume_op = onprem.mount_pvc(
"my-persistent-volume-claim",
"my-volume-name",
"/mnt/volume-mount-path")
config = KubeflowDagRunnerConfig(
pipeline_operator_funcs=[mount_secret_op, mount_volume_op]
)
Args:
pipeline_operator_funcs: A list of ContainerOp modifying functions that
will be applied to every container step in the pipeline.
tfx_image: The TFX container image to use in the pipeline.
kubeflow_metadata_config: Runtime configuration to use to connect to
Kubeflow metadata.
supported_launcher_classes: A list of component launcher classes that are
supported by the current pipeline. List sequence determines the order in
which launchers are chosen for each component being run.
metadata_ui_path: File location for metadata-ui-metadata.json file.
**kwargs: keyword args for PipelineConfig.
"""
supported_launcher_classes = supported_launcher_classes or [
in_process_component_launcher.InProcessComponentLauncher,
kubernetes_component_launcher.KubernetesComponentLauncher,
]
super().__init__(
supported_launcher_classes=supported_launcher_classes, **kwargs)
self.pipeline_operator_funcs = (
pipeline_operator_funcs or get_default_pipeline_operator_funcs())
self.tfx_image = tfx_image or DEFAULT_KUBEFLOW_TFX_IMAGE
self.kubeflow_metadata_config = (
kubeflow_metadata_config or get_default_kubeflow_metadata_config())
self.metadata_ui_path = metadata_ui_path
class KubeflowDagRunner(tfx_runner.TfxRunner):
"""Kubeflow Pipelines runner.
Constructs a pipeline definition YAML file based on the TFX logical pipeline.
"""
def __init__(self,
output_dir: Optional[str] = None,
output_filename: Optional[str] = None,
config: Optional[KubeflowDagRunnerConfig] = None,
pod_labels_to_attach: Optional[Dict[str, str]] = None):
"""Initializes KubeflowDagRunner for compiling a Kubeflow Pipeline.
Args:
output_dir: An optional output directory into which to output the pipeline
definition files. Defaults to the current working directory.
output_filename: An optional output file name for the pipeline definition
file. Defaults to pipeline_name.tar.gz when compiling a TFX pipeline.
Currently supports .tar.gz, .tgz, .zip, .yaml, .yml formats. See
https://github.com/kubeflow/pipelines/blob/181de66cf9fa87bcd0fe9291926790c400140783/sdk/python/kfp/compiler/compiler.py#L851
for format restriction.
config: An optional KubeflowDagRunnerConfig object to specify runtime
configuration when running the pipeline under Kubeflow.
pod_labels_to_attach: Optional set of pod labels to attach to GKE pod
spinned up for this pipeline. Default to the 3 labels:
1. add-pod-env: true,
2. pipeline SDK type,
3. pipeline unique ID,
where 2 and 3 are instrumentation of usage tracking.
"""
if config and not isinstance(config, KubeflowDagRunnerConfig):
raise TypeError('config must be type of KubeflowDagRunnerConfig.')
super().__init__(config or KubeflowDagRunnerConfig())
self._config = cast(KubeflowDagRunnerConfig, self._config)
self._output_dir = output_dir or os.getcwd()
self._output_filename = output_filename
self._compiler = compiler.Compiler()
self._tfx_compiler = tfx_compiler.Compiler()
self._params = [] # List of dsl.PipelineParam used in this pipeline.
self._params_by_component_id = collections.defaultdict(list)
self._deduped_parameter_names = set() # Set of unique param names used.
self._exit_handler = None
if pod_labels_to_attach is None:
self._pod_labels_to_attach = get_default_pod_labels()
else:
self._pod_labels_to_attach = pod_labels_to_attach
def _parse_parameter_from_component(
self, component: tfx_base_component.BaseComponent) -> None:
"""Extract embedded RuntimeParameter placeholders from a component.
Extract embedded RuntimeParameter placeholders from a component, then append
the corresponding dsl.PipelineParam to KubeflowDagRunner.
Args:
component: a TFX component.
"""
deduped_parameter_names_for_component = set()
for parameter in component.exec_properties.values():
if not isinstance(parameter, data_types.RuntimeParameter):
continue
# Ignore pipeline root because it will be added later.
if parameter.name == tfx_pipeline.ROOT_PARAMETER.name:
continue
if parameter.name in deduped_parameter_names_for_component:
continue
deduped_parameter_names_for_component.add(parameter.name)
self._params_by_component_id[component.id].append(parameter)
if parameter.name not in self._deduped_parameter_names:
self._deduped_parameter_names.add(parameter.name)
# TODO(b/178436919): Create a test to cover default value rendering
# and move the external code reference over there.
# The default needs to be serialized then passed to dsl.PipelineParam.
# See
# https://github.com/kubeflow/pipelines/blob/f65391309650fdc967586529e79af178241b4c2c/sdk/python/kfp/dsl/_pipeline_param.py#L154
dsl_parameter = dsl.PipelineParam(
name=parameter.name, value=str(parameter.default))
self._params.append(dsl_parameter)
def _parse_parameter_from_pipeline(self,
pipeline: tfx_pipeline.Pipeline) -> None:
"""Extract all the RuntimeParameter placeholders from the pipeline."""
for component in pipeline.components:
self._parse_parameter_from_component(component)
def _construct_pipeline_graph(self, pipeline: tfx_pipeline.Pipeline,
pipeline_root: dsl.PipelineParam):
"""Constructs a Kubeflow Pipeline graph.
Args:
pipeline: The logical TFX pipeline to base the construction on.
pipeline_root: dsl.PipelineParam representing the pipeline root.
"""
component_to_kfp_op = {}
for component in pipeline.components:
utils.replace_exec_properties(component)
tfx_ir = self._generate_tfx_ir(pipeline)
# Assumption: There is a partial ordering of components in the list, i.e.,
# if component A depends on component B and C, then A appears after B and C
# in the list.
for component in pipeline.components:
# Keep track of the set of upstream dsl.ContainerOps for this component.
depends_on = set()
for upstream_component in component.upstream_nodes:
depends_on.add(component_to_kfp_op[upstream_component])
# remove the extra pipeline node information
tfx_node_ir = self._dehydrate_tfx_ir(tfx_ir, component.id)
# Disable cache for exit_handler
if self._exit_handler and component.id == self._exit_handler.id:
tfx_node_ir.nodes[
0].pipeline_node.execution_options.caching_options.enable_cache = False
kfp_component = base_component.BaseComponent(
component=component,
depends_on=depends_on,
pipeline=pipeline,
pipeline_root=pipeline_root,
tfx_image=self._config.tfx_image,
kubeflow_metadata_config=self._config.kubeflow_metadata_config,
pod_labels_to_attach=self._pod_labels_to_attach,
tfx_ir=tfx_node_ir,
metadata_ui_path=self._config.metadata_ui_path,
runtime_parameters=(self._params_by_component_id[component.id] +
[tfx_pipeline.ROOT_PARAMETER]))
for operator in self._config.pipeline_operator_funcs:
kfp_component.container_op.apply(operator)
component_to_kfp_op[component] = kfp_component.container_op
# If exit handler defined create an exit handler and add all ops to it.
if self._exit_handler:
exit_op = component_to_kfp_op[self._exit_handler]
with dsl.ExitHandler(exit_op) as exit_handler_group:
exit_handler_group.name = utils.TFX_DAG_NAME
# KFP get_default_pipeline should have the pipeline object when invoked
# while compiling. This allows us to retrieve all ops from pipeline
# group (should be the only group in the pipeline).
pipeline_group = dsl.Pipeline.get_default_pipeline().groups[0]
# Transfer all ops to exit_handler_group which will now contain all ops.
exit_handler_group.ops = pipeline_group.ops
# remove all ops from pipeline_group. Otherwise compiler fails in
# https://github.com/kubeflow/pipelines/blob/8aee62142aa13ae42b2dd18257d7e034861b7e5e/sdk/python/kfp/compiler/compiler.py#L893
pipeline_group.ops = []
def _del_unused_field(self, node_id: str, message_dict: MutableMapping[str,
Any]):
for item in list(message_dict.keys()):
if item != node_id:
del message_dict[item]
def _dehydrate_tfx_ir(self, original_pipeline: pipeline_pb2.Pipeline,
node_id: str) -> pipeline_pb2.Pipeline:
pipeline = copy.deepcopy(original_pipeline)
for node in pipeline.nodes:
if (node.WhichOneof('node') == 'pipeline_node' and
node.pipeline_node.node_info.id == node_id):
del pipeline.nodes[:]
pipeline.nodes.extend([node])
break
deployment_config = pipeline_pb2.IntermediateDeploymentConfig()
pipeline.deployment_config.Unpack(deployment_config)
self._del_unused_field(node_id, deployment_config.executor_specs)
self._del_unused_field(node_id, deployment_config.custom_driver_specs)
self._del_unused_field(node_id,
deployment_config.node_level_platform_configs)
pipeline.deployment_config.Pack(deployment_config)
return pipeline
def _generate_tfx_ir(
self, pipeline: tfx_pipeline.Pipeline) -> Optional[pipeline_pb2.Pipeline]:
result = self._tfx_compiler.compile(pipeline)
return result
def run(self, pipeline: tfx_pipeline.Pipeline):
"""Compiles and outputs a Kubeflow Pipeline YAML definition file.
Args:
pipeline: The logical TFX pipeline to use when building the Kubeflow
pipeline.
"""
# If exit handler is defined, append to existing pipeline components.
if self._exit_handler:
original_pipeline = pipeline
pipeline = copy.copy(original_pipeline)
pipeline.components.append(self._exit_handler)
for component in pipeline.components:
# TODO(b/187122662): Pass through pip dependencies as a first-class
# component flag.
if isinstance(component, tfx_base_component.BaseComponent):
component._resolve_pip_dependencies( # pylint: disable=protected-access
pipeline.pipeline_info.pipeline_root)
# KFP DSL representation of pipeline root parameter.
dsl_pipeline_root = dsl.PipelineParam(
name=tfx_pipeline.ROOT_PARAMETER.name,
value=pipeline.pipeline_info.pipeline_root)
self._params.append(dsl_pipeline_root)
def _construct_pipeline():
"""Constructs a Kubeflow pipeline.
Creates Kubeflow ContainerOps for each TFX component encountered in the
logical pipeline definition.
"""
self._construct_pipeline_graph(pipeline, dsl_pipeline_root)
# Need to run this first to get self._params populated. Then KFP compiler
# can correctly match default value with PipelineParam.
self._parse_parameter_from_pipeline(pipeline)
file_name = self._output_filename or get_default_output_filename(
pipeline.pipeline_info.pipeline_name)
# Create workflow spec and write out to package.
self._compiler._create_and_write_workflow( # pylint: disable=protected-access
pipeline_func=_construct_pipeline,
pipeline_name=pipeline.pipeline_info.pipeline_name,
params_list=self._params,
package_path=os.path.join(self._output_dir, file_name))
def set_exit_handler(self, exit_handler: base_node.BaseNode):
"""Set exit handler components for the Kubeflow dag runner.
This feature is currently experimental without backward compatibility
gaurantee.
Args:
exit_handler: exit handler component.
"""
if not exit_handler:
logging.error('Setting empty exit handler is not allowed.')
return
assert not exit_handler.downstream_nodes, ('Exit handler should not depend '
'on any other node.')
assert not exit_handler.upstream_nodes, ('Exit handler should not depend on'
' any other node.')
self._exit_handler = exit_handler