def _dump_ui_metadata()

in tfx/orchestration/kubeflow/container_entrypoint.py [0:0]


def _dump_ui_metadata(
    node: pipeline_pb2.PipelineNode,
    execution_info: data_types.ExecutionInfo,
    ui_metadata_path: str = '/mlpipeline-ui-metadata.json') -> None:
  """Dump KFP UI metadata json file for visualization purpose.

  For general components we just render a simple Markdown file for
    exec_properties/inputs/outputs.
  If the file already exists and is a valid format(have a list of
  dictionaries in outputs key), we append the existing UI metadata items
  to our output json file.

  Args:
    node: associated TFX node.
    execution_info: runtime execution info for this component, including
      materialized inputs/outputs/execution properties and id.
    ui_metadata_path: path to dump ui metadata.
  """
  exec_properties_list = [
      '**{}**: {}'.format(
          _sanitize_underscore(name), _sanitize_underscore(exec_property))
      for name, exec_property in execution_info.exec_properties.items()
  ]
  src_str_exec_properties = '# Execution properties:\n{}'.format(
      '\n\n'.join(exec_properties_list) or 'No execution property.')

  def _dump_input_populated_artifacts(
      node_inputs: MutableMapping[str, pipeline_pb2.InputSpec],
      name_to_artifacts: Dict[str, List[artifact.Artifact]]) -> List[str]:
    """Dump artifacts markdown string for inputs.

    Args:
      node_inputs: maps from input name to input sepc proto.
      name_to_artifacts: maps from input key to list of populated artifacts.

    Returns:
      A list of dumped markdown string, each of which represents a channel.
    """
    rendered_list = []
    for name, spec in node_inputs.items():
      # Need to look for materialized artifacts in the execution decision.
      rendered_artifacts = ''.join([
          _render_artifact_as_mdstr(single_artifact)
          for single_artifact in name_to_artifacts.get(name, [])
      ])
      # TODO(b/255869994): Use InputSpec.artifact_type field instead.
      if spec.channels:
        artifact_type = spec.channels[0].artifact_query.type.name
      else:
        artifact_type = '_Unknown_'
      rendered_list.append(
          '## {name}\n\n**Type**: {channel_type}\n\n{artifacts}'.format(
              name=_sanitize_underscore(name),
              channel_type=_sanitize_underscore(artifact_type),
              artifacts=rendered_artifacts))

    return rendered_list

  def _dump_output_populated_artifacts(
      node_outputs: MutableMapping[str, pipeline_pb2.OutputSpec],
      name_to_artifacts: Dict[str, List[artifact.Artifact]]) -> List[str]:
    """Dump artifacts markdown string for outputs.

    Args:
      node_outputs: maps from output name to output sepc proto.
      name_to_artifacts: maps from output key to list of populated artifacts.

    Returns:
      A list of dumped markdown string, each of which represents a channel.
    """
    rendered_list = []
    for name, spec in node_outputs.items():
      # Need to look for materialized artifacts in the execution decision.
      rendered_artifacts = ''.join([
          _render_artifact_as_mdstr(single_artifact)
          for single_artifact in name_to_artifacts.get(name, [])
      ])
      # There must be at least a channel in a input, and all channels in a input
      # share the same artifact type.
      artifact_type = spec.artifact_spec.type.name
      rendered_list.append(
          '## {name}\n\n**Type**: {channel_type}\n\n{artifacts}'.format(
              name=_sanitize_underscore(name),
              channel_type=_sanitize_underscore(artifact_type),
              artifacts=rendered_artifacts))

    return rendered_list

  src_str_inputs = '# Inputs:\n{}'.format(''.join(
      _dump_input_populated_artifacts(
          node_inputs=node.inputs.inputs,
          name_to_artifacts=execution_info.input_dict or {})) or 'No input.')

  src_str_outputs = '# Outputs:\n{}'.format(''.join(
      _dump_output_populated_artifacts(
          node_outputs=node.outputs.outputs,
          name_to_artifacts=execution_info.output_dict or {})) or 'No output.')

  outputs = [{
      'storage':
          'inline',
      'source':
          '{exec_properties}\n\n{inputs}\n\n{outputs}'.format(
              exec_properties=src_str_exec_properties,
              inputs=src_str_inputs,
              outputs=src_str_outputs),
      'type':
          'markdown',
  }]
  # Add Tensorboard view for ModelRun outpus.
  for name, spec in node.outputs.outputs.items():
    if spec.artifact_spec.type.name == standard_artifacts.ModelRun.TYPE_NAME:
      output_model = execution_info.output_dict[name][0]

      # Add Tensorboard view.
      tensorboard_output = {'type': 'tensorboard', 'source': output_model.uri}
      outputs.append(tensorboard_output)

  # Add existing KFP UI Metadata if the file exists and is a valid format
  if os.path.isfile(ui_metadata_path):

    def _read_validated_ui_metadata(
        ui_metadata_path: str) -> List[Dict[str, str]]:
      """Read validated existing KFP UI Metadata file.

      Args:
        ui_metadata_path: path for ui metadata

      Returns:
        A list of UI metadata if the file is valid. An empty list otherwise.
      """
      result = []
      try:
        with open(ui_metadata_path, 'r') as f:
          metadata_dict = json.load(f)

        if ('outputs' in metadata_dict and
            isinstance(metadata_dict['outputs'], list)):
          for ui_metadata in metadata_dict['outputs']:
            if isinstance(ui_metadata, dict):
              result.append(ui_metadata)
      except json.JSONDecodeError:
        pass
      return result

    outputs += _read_validated_ui_metadata(ui_metadata_path)

  metadata_dict = {'outputs': outputs}

  with open(ui_metadata_path, 'w') as f:
    json.dump(metadata_dict, f)