in tfx/orchestration/kubeflow/v2/container/kubeflow_v2_run_executor.py [0:0]
def _run_executor(args: argparse.Namespace, beam_args: List[str]) -> None:
"""Selects a particular executor and run it based on name.
Args:
args:
--executor_class_path: The import path of the executor class.
--json_serialized_invocation_args: Full JSON-serialized parameters for
this execution.
beam_args: Optional parameter that maps to the optional_pipeline_args
parameter in the pipeline, which provides additional configuration options
for apache-beam and tensorflow.logging.
For more about the beam arguments please refer to:
https://cloud.google.com/dataflow/docs/guides/specifying-exec-params
"""
logging.set_verbosity(logging.INFO)
# Rehydrate inputs/outputs/exec_properties from the serialized metadata.
executor_input = pipeline_spec_pb2.ExecutorInput()
json_format.Parse(
args.json_serialized_invocation_args,
executor_input,
ignore_unknown_fields=True)
inputs_dict = executor_input.inputs.artifacts
outputs_dict = executor_input.outputs.artifacts
inputs_parameter = executor_input.inputs.parameters
outputs_parameters = executor_input.outputs.parameters
# Format {pipelineJob.runtimeConfig.gcsOutputDirectory}/{project_number}
# /{pipeline_job_user_id}/{task_name}_{task_uuid}/executor_output.json
task_root = os.path.dirname(executor_input.outputs.output_file)
tmp_path = os.path.join(task_root, '.temp')
task_unique_id = os.path.basename(task_root)
if fileio.exists(executor_input.outputs.output_file):
# It has a driver that outputs the updated exec_properties in this file.
with fileio.open(executor_input.outputs.output_file,
'rb') as output_meta_json:
output_metadata = pipeline_spec_pb2.ExecutorOutput()
json_format.Parse(
output_meta_json.read(), output_metadata, ignore_unknown_fields=True)
# Append/Overwrite exec_propertise.
for k, v in output_metadata.parameters.items():
inputs_parameter[k].CopyFrom(v)
name_from_id = {}
inputs = kubeflow_v2_entrypoint_utils.parse_raw_artifact_dict(
inputs_dict, name_from_id)
outputs = kubeflow_v2_entrypoint_utils.parse_raw_artifact_dict(
outputs_dict, name_from_id)
exec_properties = kubeflow_v2_entrypoint_utils.parse_execution_properties(
inputs_parameter)
logging.info('Executor %s do: inputs: %s, outputs: %s, exec_properties: %s',
args.executor_class_path, inputs, outputs, exec_properties)
executor_cls = import_utils.import_class_by_path(args.executor_class_path)
if issubclass(executor_cls, base_beam_executor.BaseBeamExecutor):
executor_context = base_beam_executor.BaseBeamExecutor.Context(
beam_pipeline_args=beam_args,
unique_id=task_unique_id,
tmp_dir=tmp_path)
else:
executor_context = base_executor.BaseExecutor.Context(
extra_flags=beam_args, unique_id=task_unique_id, tmp_dir=tmp_path)
executor = executor_cls(executor_context)
logging.info('Starting executor')
executor.Do(inputs, outputs, exec_properties)
outputs_utils.tag_output_artifacts_with_version(outputs)
# TODO(b/169583143): Remove this workaround when TFX migrates to use str-typed
# id/name to identify artifacts.
# Convert ModelBlessing artifact to use managed MLMD resource name.
if (issubclass(executor_cls, evaluator_executor.Executor) and
standard_component_specs.BLESSING_KEY in outputs):
# Parse the parent prefix for managed MLMD resource name.
kubeflow_v2_entrypoint_utils.refactor_model_blessing(
artifact_utils.get_single_instance(
outputs[standard_component_specs.BLESSING_KEY]), name_from_id)
# Log the output metadata to a file. So that it can be picked up by MP.
metadata_uri = executor_input.outputs.output_file
executor_output = pipeline_spec_pb2.ExecutorOutput()
for k, v in kubeflow_v2_entrypoint_utils.translate_executor_output(
outputs, name_from_id).items():
executor_output.artifacts[k].CopyFrom(v)
for key in outputs_parameters.keys():
if key not in outputs.keys():
raise ValueError(
'All OutputParameters must have corresponding OutputValueArtifacts.')
assert len(outputs[key]) == 1 and isinstance(
outputs[key][0], value_artifact.ValueArtifact), (
'Parameter should have one corresponding ValueArtifact.')
artifact = outputs[key][0]
if isinstance(artifact, standard_artifacts.String):
executor_output.parameter_values[key].string_value = artifact.read()
elif isinstance(artifact, standard_artifacts.Float) or isinstance(
artifact, standard_artifacts.Integer):
executor_output.parameter_values[key].number_value = artifact.read()
elif isinstance(artifact, standard_artifacts.Boolean):
executor_output.parameter_values[key].bool_value = artifact.read()
else:
raise ValueError(
'Only String, Float, Int, and Boolean ValueArtifacts are supported.'
)
fileio.makedirs(os.path.dirname(metadata_uri))
with fileio.open(metadata_uri, 'wb') as f:
f.write(json_format.MessageToJson(executor_output))