def _set_node_context()

in tfx/dsl/compiler/compiler.py [0:0]


def _set_node_context(node: pipeline_pb2.PipelineNode,
                      pipeline_ctx: compiler_context.PipelineContext):
  """Compiles the node contexts of a pipeline node."""
  # Context for the pipeline, across pipeline runs.
  pipeline_context_pb = node.contexts.contexts.add()
  pipeline_context_pb.type.name = constants.PIPELINE_CONTEXT_TYPE_NAME
  pipeline_context_pb.name.field_value.string_value = (
      pipeline_ctx.pipeline_info.pipeline_context_name)

  # Context for the current pipeline run.
  if pipeline_ctx.is_sync_mode:
    pipeline_run_context_pb = node.contexts.contexts.add()
    pipeline_run_context_pb.type.name = constants.PIPELINE_RUN_CONTEXT_TYPE_NAME
    # TODO(kennethyang): Miragte pipeline run id to structural_runtime_parameter
    # To keep existing IR textprotos used in tests unchanged, we only use
    # structural_runtime_parameter for subpipelines. After the subpipeline being
    # implemented, we will need to migrate normal pipelines to
    # structural_runtime_parameter as well for consistency. Similar for below.
    if pipeline_ctx.is_subpipeline:
      compiler_utils.set_structural_runtime_parameter_pb(
          pipeline_run_context_pb.name.structural_runtime_parameter, [
              f"{pipeline_ctx.pipeline_info.pipeline_context_name}_",
              (constants.PIPELINE_RUN_ID_PARAMETER_NAME, str)
          ])
    else:
      compiler_utils.set_runtime_parameter_pb(
          pipeline_run_context_pb.name.runtime_parameter,
          constants.PIPELINE_RUN_ID_PARAMETER_NAME, str)

  # Contexts inherited from the parent pipelines.
  for i, parent_pipeline in enumerate(pipeline_ctx.parent_pipelines[::-1]):
    parent_pipeline_context_pb = node.contexts.contexts.add()
    parent_pipeline_context_pb.type.name = constants.PIPELINE_CONTEXT_TYPE_NAME
    parent_pipeline_context_pb.name.field_value.string_value = (
        parent_pipeline.pipeline_info.pipeline_context_name)

    if parent_pipeline.execution_mode == pipeline.ExecutionMode.SYNC:
      pipeline_run_context_pb = node.contexts.contexts.add()
      pipeline_run_context_pb.type.name = (
          constants.PIPELINE_RUN_CONTEXT_TYPE_NAME)

      # TODO(kennethyang): Miragte pipeline run id to structural runtime
      # parameter for the similar reason mentioned above.
      # Use structural runtime parameter to represent pipeline_run_id except
      # for the root level pipeline, for backward compatibility.
      if i == len(pipeline_ctx.parent_pipelines) - 1:
        compiler_utils.set_runtime_parameter_pb(
            pipeline_run_context_pb.name.runtime_parameter,
            constants.PIPELINE_RUN_ID_PARAMETER_NAME, str)
      else:
        compiler_utils.set_structural_runtime_parameter_pb(
            pipeline_run_context_pb.name.structural_runtime_parameter, [
                f"{parent_pipeline.pipeline_info.pipeline_context_name}_",
                (constants.PIPELINE_RUN_ID_PARAMETER_NAME, str)
            ])

  # Context for the node, across pipeline runs.
  node_context_pb = node.contexts.contexts.add()
  node_context_pb.type.name = constants.NODE_CONTEXT_TYPE_NAME
  node_context_pb.name.field_value.string_value = (
      compiler_utils.node_context_name(
          pipeline_ctx.pipeline_info.pipeline_context_name,
          node.node_info.id))