in tfx/dsl/compiler/compiler.py [0:0]
def _set_node_context(node: pipeline_pb2.PipelineNode,
pipeline_ctx: compiler_context.PipelineContext):
"""Compiles the node contexts of a pipeline node."""
# Context for the pipeline, across pipeline runs.
pipeline_context_pb = node.contexts.contexts.add()
pipeline_context_pb.type.name = constants.PIPELINE_CONTEXT_TYPE_NAME
pipeline_context_pb.name.field_value.string_value = (
pipeline_ctx.pipeline_info.pipeline_context_name)
# Context for the current pipeline run.
if pipeline_ctx.is_sync_mode:
pipeline_run_context_pb = node.contexts.contexts.add()
pipeline_run_context_pb.type.name = constants.PIPELINE_RUN_CONTEXT_TYPE_NAME
# TODO(kennethyang): Miragte pipeline run id to structural_runtime_parameter
# To keep existing IR textprotos used in tests unchanged, we only use
# structural_runtime_parameter for subpipelines. After the subpipeline being
# implemented, we will need to migrate normal pipelines to
# structural_runtime_parameter as well for consistency. Similar for below.
if pipeline_ctx.is_subpipeline:
compiler_utils.set_structural_runtime_parameter_pb(
pipeline_run_context_pb.name.structural_runtime_parameter, [
f"{pipeline_ctx.pipeline_info.pipeline_context_name}_",
(constants.PIPELINE_RUN_ID_PARAMETER_NAME, str)
])
else:
compiler_utils.set_runtime_parameter_pb(
pipeline_run_context_pb.name.runtime_parameter,
constants.PIPELINE_RUN_ID_PARAMETER_NAME, str)
# Contexts inherited from the parent pipelines.
for i, parent_pipeline in enumerate(pipeline_ctx.parent_pipelines[::-1]):
parent_pipeline_context_pb = node.contexts.contexts.add()
parent_pipeline_context_pb.type.name = constants.PIPELINE_CONTEXT_TYPE_NAME
parent_pipeline_context_pb.name.field_value.string_value = (
parent_pipeline.pipeline_info.pipeline_context_name)
if parent_pipeline.execution_mode == pipeline.ExecutionMode.SYNC:
pipeline_run_context_pb = node.contexts.contexts.add()
pipeline_run_context_pb.type.name = (
constants.PIPELINE_RUN_CONTEXT_TYPE_NAME)
# TODO(kennethyang): Miragte pipeline run id to structural runtime
# parameter for the similar reason mentioned above.
# Use structural runtime parameter to represent pipeline_run_id except
# for the root level pipeline, for backward compatibility.
if i == len(pipeline_ctx.parent_pipelines) - 1:
compiler_utils.set_runtime_parameter_pb(
pipeline_run_context_pb.name.runtime_parameter,
constants.PIPELINE_RUN_ID_PARAMETER_NAME, str)
else:
compiler_utils.set_structural_runtime_parameter_pb(
pipeline_run_context_pb.name.structural_runtime_parameter, [
f"{parent_pipeline.pipeline_info.pipeline_context_name}_",
(constants.PIPELINE_RUN_ID_PARAMETER_NAME, str)
])
# Context for the node, across pipeline runs.
node_context_pb = node.contexts.contexts.add()
node_context_pb.type.name = constants.NODE_CONTEXT_TYPE_NAME
node_context_pb.name.field_value.string_value = (
compiler_utils.node_context_name(
pipeline_ctx.pipeline_info.pipeline_context_name,
node.node_info.id))