def _construct_pipeline_graph()

in tfx/orchestration/kubeflow/kubeflow_dag_runner.py [0:0]


  def _construct_pipeline_graph(self, pipeline: tfx_pipeline.Pipeline,
                                pipeline_root: dsl.PipelineParam):
    """Constructs a Kubeflow Pipeline graph.

    Args:
      pipeline: The logical TFX pipeline to base the construction on.
      pipeline_root: dsl.PipelineParam representing the pipeline root.
    """
    component_to_kfp_op = {}

    for component in pipeline.components:
      utils.replace_exec_properties(component)
    tfx_ir = self._generate_tfx_ir(pipeline)

    # Assumption: There is a partial ordering of components in the list, i.e.,
    # if component A depends on component B and C, then A appears after B and C
    # in the list.
    for component in pipeline.components:
      # Keep track of the set of upstream dsl.ContainerOps for this component.
      depends_on = set()

      for upstream_component in component.upstream_nodes:
        depends_on.add(component_to_kfp_op[upstream_component])

      # remove the extra pipeline node information
      tfx_node_ir = self._dehydrate_tfx_ir(tfx_ir, component.id)

      # Disable cache for exit_handler
      if self._exit_handler and component.id == self._exit_handler.id:
        tfx_node_ir.nodes[
            0].pipeline_node.execution_options.caching_options.enable_cache = False

      kfp_component = base_component.BaseComponent(
          component=component,
          depends_on=depends_on,
          pipeline=pipeline,
          pipeline_root=pipeline_root,
          tfx_image=self._config.tfx_image,
          kubeflow_metadata_config=self._config.kubeflow_metadata_config,
          pod_labels_to_attach=self._pod_labels_to_attach,
          tfx_ir=tfx_node_ir,
          metadata_ui_path=self._config.metadata_ui_path,
          runtime_parameters=(self._params_by_component_id[component.id] +
                              [tfx_pipeline.ROOT_PARAMETER]))

      for operator in self._config.pipeline_operator_funcs:
        kfp_component.container_op.apply(operator)

      component_to_kfp_op[component] = kfp_component.container_op

    # If exit handler defined create an exit handler and add all ops to it.
    if self._exit_handler:
      exit_op = component_to_kfp_op[self._exit_handler]
      with dsl.ExitHandler(exit_op) as exit_handler_group:
        exit_handler_group.name = utils.TFX_DAG_NAME
        # KFP get_default_pipeline should have the pipeline object when invoked
        # while compiling. This allows us to retrieve all ops from pipeline
        # group (should be the only group in the pipeline).
        pipeline_group = dsl.Pipeline.get_default_pipeline().groups[0]

        # Transfer all ops to exit_handler_group which will now contain all ops.
        exit_handler_group.ops = pipeline_group.ops
        # remove all ops from pipeline_group. Otherwise compiler fails in
        # https://github.com/kubeflow/pipelines/blob/8aee62142aa13ae42b2dd18257d7e034861b7e5e/sdk/python/kfp/compiler/compiler.py#L893
        pipeline_group.ops = []