in tfx/orchestration/kubeflow/v2/pipeline_builder.py [0:0]
def build(self) -> pipeline_pb2.PipelineSpec:
"""Build a pipeline PipelineSpec."""
_check_name(self._pipeline_info.pipeline_name)
deployment_config = pipeline_pb2.PipelineDeploymentConfig()
pipeline_info = pipeline_pb2.PipelineInfo(
name=self._pipeline_info.pipeline_name)
self._pipeline.finalize()
# Map from (upstream_node_id, output_key) to output_type (ValueArtifact)
dynamic_exec_properties = {}
for component in self._pipeline.components:
for name, value in component.exec_properties.items():
if isinstance(value, placeholder.ChannelWrappedPlaceholder):
node_id = value.channel.producer_component_id
dynamic_exec_properties[(
node_id, value.channel.output_key)] = value.channel.type.TYPE_NAME
tfx_tasks = {}
component_defs = {}
# Map from (producer component id, output key) to (new producer component
# id, output key)
channel_redirect_map = {}
with parameter_utils.ParameterContext() as pc:
for component in self._pipeline.components:
if self._exit_handler and component.id == utils.TFX_DAG_NAME:
component.with_id(component.id + _generate_component_name_suffix())
logging.warning(
'_tfx_dag is system reserved name for pipeline with'
'exit handler, added suffix to your component name: %s',
component.id)
# Here the topological order of components is required.
# If a channel redirection is needed, redirect mapping is expected to be
# available because the upstream node (which is the cause for
# redirecting) is processed before the downstream consumer nodes.
built_tasks = step_builder.StepBuilder(
node=component,
deployment_config=deployment_config,
component_defs=component_defs,
dynamic_exec_properties=dynamic_exec_properties,
dsl_context_reg=self._pipeline.dsl_context_registry,
image=self._default_image,
image_cmds=self._default_commands,
beam_pipeline_args=self._pipeline.beam_pipeline_args,
enable_cache=self._pipeline.enable_cache,
pipeline_info=self._pipeline_info,
channel_redirect_map=channel_redirect_map).build()
tfx_tasks.update(built_tasks)
result = pipeline_pb2.PipelineSpec(pipeline_info=pipeline_info)
# if exit handler is defined, put all the TFX tasks under tfx_dag,
# exit handler is a separate component triggered by tfx_dag.
if self._exit_handler:
for name, task_spec in tfx_tasks.items():
result.components[utils.TFX_DAG_NAME].dag.tasks[name].CopyFrom(
task_spec)
# construct root with exit handler
exit_handler_task = step_builder.StepBuilder(
node=self._exit_handler,
deployment_config=deployment_config,
component_defs=component_defs,
dsl_context_reg=self._pipeline.dsl_context_registry,
dynamic_exec_properties=dynamic_exec_properties,
image=self._default_image,
image_cmds=self._default_commands,
beam_pipeline_args=self._pipeline.beam_pipeline_args,
enable_cache=False,
pipeline_info=self._pipeline_info,
channel_redirect_map=channel_redirect_map,
is_exit_handler=True).build()
result.root.dag.tasks[
utils.TFX_DAG_NAME].component_ref.name = utils.TFX_DAG_NAME
result.root.dag.tasks[
utils.TFX_DAG_NAME].task_info.name = utils.TFX_DAG_NAME
result.root.dag.tasks[self._exit_handler.id].CopyFrom(
exit_handler_task[self._exit_handler.id])
else:
for name, task_spec in tfx_tasks.items():
result.root.dag.tasks[name].CopyFrom(task_spec)
result.deployment_spec.update(json_format.MessageToDict(deployment_config))
for name, component_def in component_defs.items():
result.components[name].CopyFrom(component_def)
# Attach runtime parameter to root's input parameter
for param in pc.parameters:
result.root.input_definitions.parameters[param.name].CopyFrom(
compiler_utils.build_parameter_type_spec(param))
return result