def build()

in tfx/orchestration/kubeflow/v2/pipeline_builder.py [0:0]


  def build(self) -> pipeline_pb2.PipelineSpec:
    """Build a pipeline PipelineSpec."""

    _check_name(self._pipeline_info.pipeline_name)

    deployment_config = pipeline_pb2.PipelineDeploymentConfig()
    pipeline_info = pipeline_pb2.PipelineInfo(
        name=self._pipeline_info.pipeline_name)

    self._pipeline.finalize()

    # Map from (upstream_node_id, output_key) to output_type (ValueArtifact)
    dynamic_exec_properties = {}
    for component in self._pipeline.components:
      for name, value in component.exec_properties.items():

        if isinstance(value, placeholder.ChannelWrappedPlaceholder):
          node_id = value.channel.producer_component_id
          dynamic_exec_properties[(
              node_id, value.channel.output_key)] = value.channel.type.TYPE_NAME
    tfx_tasks = {}
    component_defs = {}
    # Map from (producer component id, output key) to (new producer component
    # id, output key)
    channel_redirect_map = {}
    with parameter_utils.ParameterContext() as pc:
      for component in self._pipeline.components:
        if self._exit_handler and component.id == utils.TFX_DAG_NAME:
          component.with_id(component.id + _generate_component_name_suffix())
          logging.warning(
              '_tfx_dag is system reserved name for pipeline with'
              'exit handler, added suffix to your component name: %s',
              component.id)
        # Here the topological order of components is required.
        # If a channel redirection is needed, redirect mapping is expected to be
        # available because the upstream node (which is the cause for
        # redirecting) is processed before the downstream consumer nodes.
        built_tasks = step_builder.StepBuilder(
            node=component,
            deployment_config=deployment_config,
            component_defs=component_defs,
            dynamic_exec_properties=dynamic_exec_properties,
            dsl_context_reg=self._pipeline.dsl_context_registry,
            image=self._default_image,
            image_cmds=self._default_commands,
            beam_pipeline_args=self._pipeline.beam_pipeline_args,
            enable_cache=self._pipeline.enable_cache,
            pipeline_info=self._pipeline_info,
            channel_redirect_map=channel_redirect_map).build()
        tfx_tasks.update(built_tasks)

    result = pipeline_pb2.PipelineSpec(pipeline_info=pipeline_info)

    # if exit handler is defined, put all the TFX tasks under tfx_dag,
    # exit handler is a separate component triggered by tfx_dag.
    if self._exit_handler:
      for name, task_spec in tfx_tasks.items():
        result.components[utils.TFX_DAG_NAME].dag.tasks[name].CopyFrom(
            task_spec)
      # construct root with exit handler
      exit_handler_task = step_builder.StepBuilder(
          node=self._exit_handler,
          deployment_config=deployment_config,
          component_defs=component_defs,
          dsl_context_reg=self._pipeline.dsl_context_registry,
          dynamic_exec_properties=dynamic_exec_properties,
          image=self._default_image,
          image_cmds=self._default_commands,
          beam_pipeline_args=self._pipeline.beam_pipeline_args,
          enable_cache=False,
          pipeline_info=self._pipeline_info,
          channel_redirect_map=channel_redirect_map,
          is_exit_handler=True).build()
      result.root.dag.tasks[
          utils.TFX_DAG_NAME].component_ref.name = utils.TFX_DAG_NAME
      result.root.dag.tasks[
          utils.TFX_DAG_NAME].task_info.name = utils.TFX_DAG_NAME
      result.root.dag.tasks[self._exit_handler.id].CopyFrom(
          exit_handler_task[self._exit_handler.id])
    else:
      for name, task_spec in tfx_tasks.items():
        result.root.dag.tasks[name].CopyFrom(task_spec)

    result.deployment_spec.update(json_format.MessageToDict(deployment_config))
    for name, component_def in component_defs.items():
      result.components[name].CopyFrom(component_def)

    # Attach runtime parameter to root's input parameter
    for param in pc.parameters:
      result.root.input_definitions.parameters[param.name].CopyFrom(
          compiler_utils.build_parameter_type_spec(param))

    return result