in tfx/orchestration/kubeflow/container_entrypoint.py [0:0]
def _dump_ui_metadata(
node: pipeline_pb2.PipelineNode,
execution_info: data_types.ExecutionInfo,
ui_metadata_path: str = '/mlpipeline-ui-metadata.json') -> None:
"""Dump KFP UI metadata json file for visualization purpose.
For general components we just render a simple Markdown file for
exec_properties/inputs/outputs.
If the file already exists and is a valid format(have a list of
dictionaries in outputs key), we append the existing UI metadata items
to our output json file.
Args:
node: associated TFX node.
execution_info: runtime execution info for this component, including
materialized inputs/outputs/execution properties and id.
ui_metadata_path: path to dump ui metadata.
"""
exec_properties_list = [
'**{}**: {}'.format(
_sanitize_underscore(name), _sanitize_underscore(exec_property))
for name, exec_property in execution_info.exec_properties.items()
]
src_str_exec_properties = '# Execution properties:\n{}'.format(
'\n\n'.join(exec_properties_list) or 'No execution property.')
def _dump_input_populated_artifacts(
node_inputs: MutableMapping[str, pipeline_pb2.InputSpec],
name_to_artifacts: Dict[str, List[artifact.Artifact]]) -> List[str]:
"""Dump artifacts markdown string for inputs.
Args:
node_inputs: maps from input name to input sepc proto.
name_to_artifacts: maps from input key to list of populated artifacts.
Returns:
A list of dumped markdown string, each of which represents a channel.
"""
rendered_list = []
for name, spec in node_inputs.items():
# Need to look for materialized artifacts in the execution decision.
rendered_artifacts = ''.join([
_render_artifact_as_mdstr(single_artifact)
for single_artifact in name_to_artifacts.get(name, [])
])
# TODO(b/255869994): Use InputSpec.artifact_type field instead.
if spec.channels:
artifact_type = spec.channels[0].artifact_query.type.name
else:
artifact_type = '_Unknown_'
rendered_list.append(
'## {name}\n\n**Type**: {channel_type}\n\n{artifacts}'.format(
name=_sanitize_underscore(name),
channel_type=_sanitize_underscore(artifact_type),
artifacts=rendered_artifacts))
return rendered_list
def _dump_output_populated_artifacts(
node_outputs: MutableMapping[str, pipeline_pb2.OutputSpec],
name_to_artifacts: Dict[str, List[artifact.Artifact]]) -> List[str]:
"""Dump artifacts markdown string for outputs.
Args:
node_outputs: maps from output name to output sepc proto.
name_to_artifacts: maps from output key to list of populated artifacts.
Returns:
A list of dumped markdown string, each of which represents a channel.
"""
rendered_list = []
for name, spec in node_outputs.items():
# Need to look for materialized artifacts in the execution decision.
rendered_artifacts = ''.join([
_render_artifact_as_mdstr(single_artifact)
for single_artifact in name_to_artifacts.get(name, [])
])
# There must be at least a channel in a input, and all channels in a input
# share the same artifact type.
artifact_type = spec.artifact_spec.type.name
rendered_list.append(
'## {name}\n\n**Type**: {channel_type}\n\n{artifacts}'.format(
name=_sanitize_underscore(name),
channel_type=_sanitize_underscore(artifact_type),
artifacts=rendered_artifacts))
return rendered_list
src_str_inputs = '# Inputs:\n{}'.format(''.join(
_dump_input_populated_artifacts(
node_inputs=node.inputs.inputs,
name_to_artifacts=execution_info.input_dict or {})) or 'No input.')
src_str_outputs = '# Outputs:\n{}'.format(''.join(
_dump_output_populated_artifacts(
node_outputs=node.outputs.outputs,
name_to_artifacts=execution_info.output_dict or {})) or 'No output.')
outputs = [{
'storage':
'inline',
'source':
'{exec_properties}\n\n{inputs}\n\n{outputs}'.format(
exec_properties=src_str_exec_properties,
inputs=src_str_inputs,
outputs=src_str_outputs),
'type':
'markdown',
}]
# Add Tensorboard view for ModelRun outpus.
for name, spec in node.outputs.outputs.items():
if spec.artifact_spec.type.name == standard_artifacts.ModelRun.TYPE_NAME:
output_model = execution_info.output_dict[name][0]
# Add Tensorboard view.
tensorboard_output = {'type': 'tensorboard', 'source': output_model.uri}
outputs.append(tensorboard_output)
# Add existing KFP UI Metadata if the file exists and is a valid format
if os.path.isfile(ui_metadata_path):
def _read_validated_ui_metadata(
ui_metadata_path: str) -> List[Dict[str, str]]:
"""Read validated existing KFP UI Metadata file.
Args:
ui_metadata_path: path for ui metadata
Returns:
A list of UI metadata if the file is valid. An empty list otherwise.
"""
result = []
try:
with open(ui_metadata_path, 'r') as f:
metadata_dict = json.load(f)
if ('outputs' in metadata_dict and
isinstance(metadata_dict['outputs'], list)):
for ui_metadata in metadata_dict['outputs']:
if isinstance(ui_metadata, dict):
result.append(ui_metadata)
except json.JSONDecodeError:
pass
return result
outputs += _read_validated_ui_metadata(ui_metadata_path)
metadata_dict = {'outputs': outputs}
with open(ui_metadata_path, 'w') as f:
json.dump(metadata_dict, f)