in tfx/orchestration/kubeflow/kubeflow_dag_runner.py [0:0]
def _construct_pipeline_graph(self, pipeline: tfx_pipeline.Pipeline,
pipeline_root: dsl.PipelineParam):
"""Constructs a Kubeflow Pipeline graph.
Args:
pipeline: The logical TFX pipeline to base the construction on.
pipeline_root: dsl.PipelineParam representing the pipeline root.
"""
component_to_kfp_op = {}
for component in pipeline.components:
utils.replace_exec_properties(component)
tfx_ir = self._generate_tfx_ir(pipeline)
# Assumption: There is a partial ordering of components in the list, i.e.,
# if component A depends on component B and C, then A appears after B and C
# in the list.
for component in pipeline.components:
# Keep track of the set of upstream dsl.ContainerOps for this component.
depends_on = set()
for upstream_component in component.upstream_nodes:
depends_on.add(component_to_kfp_op[upstream_component])
# remove the extra pipeline node information
tfx_node_ir = self._dehydrate_tfx_ir(tfx_ir, component.id)
# Disable cache for exit_handler
if self._exit_handler and component.id == self._exit_handler.id:
tfx_node_ir.nodes[
0].pipeline_node.execution_options.caching_options.enable_cache = False
kfp_component = base_component.BaseComponent(
component=component,
depends_on=depends_on,
pipeline=pipeline,
pipeline_root=pipeline_root,
tfx_image=self._config.tfx_image,
kubeflow_metadata_config=self._config.kubeflow_metadata_config,
pod_labels_to_attach=self._pod_labels_to_attach,
tfx_ir=tfx_node_ir,
metadata_ui_path=self._config.metadata_ui_path,
runtime_parameters=(self._params_by_component_id[component.id] +
[tfx_pipeline.ROOT_PARAMETER]))
for operator in self._config.pipeline_operator_funcs:
kfp_component.container_op.apply(operator)
component_to_kfp_op[component] = kfp_component.container_op
# If exit handler defined create an exit handler and add all ops to it.
if self._exit_handler:
exit_op = component_to_kfp_op[self._exit_handler]
with dsl.ExitHandler(exit_op) as exit_handler_group:
exit_handler_group.name = utils.TFX_DAG_NAME
# KFP get_default_pipeline should have the pipeline object when invoked
# while compiling. This allows us to retrieve all ops from pipeline
# group (should be the only group in the pipeline).
pipeline_group = dsl.Pipeline.get_default_pipeline().groups[0]
# Transfer all ops to exit_handler_group which will now contain all ops.
exit_handler_group.ops = pipeline_group.ops
# remove all ops from pipeline_group. Otherwise compiler fails in
# https://github.com/kubeflow/pipelines/blob/8aee62142aa13ae42b2dd18257d7e034861b7e5e/sdk/python/kfp/compiler/compiler.py#L893
pipeline_group.ops = []