def _run_executor()

in tfx/orchestration/kubeflow/v2/container/kubeflow_v2_run_executor.py [0:0]


def _run_executor(args: argparse.Namespace, beam_args: List[str]) -> None:
  """Selects a particular executor and run it based on name.

  Args:
    args:
      --executor_class_path: The import path of the executor class.
      --json_serialized_invocation_args: Full JSON-serialized parameters for
        this execution.
    beam_args: Optional parameter that maps to the optional_pipeline_args
      parameter in the pipeline, which provides additional configuration options
      for apache-beam and tensorflow.logging.
    For more about the beam arguments please refer to:
    https://cloud.google.com/dataflow/docs/guides/specifying-exec-params
  """
  logging.set_verbosity(logging.INFO)

  # Rehydrate inputs/outputs/exec_properties from the serialized metadata.
  executor_input = pipeline_spec_pb2.ExecutorInput()
  json_format.Parse(
      args.json_serialized_invocation_args,
      executor_input,
      ignore_unknown_fields=True)

  inputs_dict = executor_input.inputs.artifacts
  outputs_dict = executor_input.outputs.artifacts
  inputs_parameter = executor_input.inputs.parameters
  outputs_parameters = executor_input.outputs.parameters

  # Format {pipelineJob.runtimeConfig.gcsOutputDirectory}/{project_number}
  #       /{pipeline_job_user_id}/{task_name}_{task_uuid}/executor_output.json
  task_root = os.path.dirname(executor_input.outputs.output_file)
  tmp_path = os.path.join(task_root, '.temp')
  task_unique_id = os.path.basename(task_root)

  if fileio.exists(executor_input.outputs.output_file):
    # It has a driver that outputs the updated exec_properties in this file.
    with fileio.open(executor_input.outputs.output_file,
                     'rb') as output_meta_json:
      output_metadata = pipeline_spec_pb2.ExecutorOutput()
      json_format.Parse(
          output_meta_json.read(), output_metadata, ignore_unknown_fields=True)
      # Append/Overwrite exec_propertise.
      for k, v in output_metadata.parameters.items():
        inputs_parameter[k].CopyFrom(v)

  name_from_id = {}

  inputs = kubeflow_v2_entrypoint_utils.parse_raw_artifact_dict(
      inputs_dict, name_from_id)
  outputs = kubeflow_v2_entrypoint_utils.parse_raw_artifact_dict(
      outputs_dict, name_from_id)
  exec_properties = kubeflow_v2_entrypoint_utils.parse_execution_properties(
      inputs_parameter)
  logging.info('Executor %s do: inputs: %s, outputs: %s, exec_properties: %s',
               args.executor_class_path, inputs, outputs, exec_properties)
  executor_cls = import_utils.import_class_by_path(args.executor_class_path)
  if issubclass(executor_cls, base_beam_executor.BaseBeamExecutor):
    executor_context = base_beam_executor.BaseBeamExecutor.Context(
        beam_pipeline_args=beam_args,
        unique_id=task_unique_id,
        tmp_dir=tmp_path)
  else:
    executor_context = base_executor.BaseExecutor.Context(
        extra_flags=beam_args, unique_id=task_unique_id, tmp_dir=tmp_path)
  executor = executor_cls(executor_context)
  logging.info('Starting executor')
  executor.Do(inputs, outputs, exec_properties)

  outputs_utils.tag_output_artifacts_with_version(outputs)

  # TODO(b/169583143): Remove this workaround when TFX migrates to use str-typed
  # id/name to identify artifacts.
  # Convert ModelBlessing artifact to use managed MLMD resource name.
  if (issubclass(executor_cls, evaluator_executor.Executor) and
      standard_component_specs.BLESSING_KEY in outputs):
    # Parse the parent prefix for managed MLMD resource name.
    kubeflow_v2_entrypoint_utils.refactor_model_blessing(
        artifact_utils.get_single_instance(
            outputs[standard_component_specs.BLESSING_KEY]), name_from_id)

  # Log the output metadata to a file. So that it can be picked up by MP.
  metadata_uri = executor_input.outputs.output_file
  executor_output = pipeline_spec_pb2.ExecutorOutput()
  for k, v in kubeflow_v2_entrypoint_utils.translate_executor_output(
      outputs, name_from_id).items():
    executor_output.artifacts[k].CopyFrom(v)

  for key in outputs_parameters.keys():
    if key not in outputs.keys():
      raise ValueError(
          'All OutputParameters must have corresponding OutputValueArtifacts.')
    assert len(outputs[key]) == 1 and isinstance(
        outputs[key][0], value_artifact.ValueArtifact), (
            'Parameter should have one corresponding ValueArtifact.')
    artifact = outputs[key][0]
    if isinstance(artifact, standard_artifacts.String):
      executor_output.parameter_values[key].string_value = artifact.read()
    elif isinstance(artifact, standard_artifacts.Float) or isinstance(
        artifact, standard_artifacts.Integer):
      executor_output.parameter_values[key].number_value = artifact.read()
    elif isinstance(artifact, standard_artifacts.Boolean):
      executor_output.parameter_values[key].bool_value = artifact.read()
    else:
      raise ValueError(
          'Only String, Float, Int, and Boolean ValueArtifacts are supported.'
      )

  fileio.makedirs(os.path.dirname(metadata_uri))
  with fileio.open(metadata_uri, 'wb') as f:
    f.write(json_format.MessageToJson(executor_output))