def run_component()

in tfx/scripts/run_component.py [0:0]


def run_component(full_component_class_name: str,
                  temp_directory_path: Optional[str] = None,
                  beam_pipeline_args: Optional[List[str]] = None,
                  **arguments):
  r"""Loads a component, instantiates it with arguments and runs its executor.

  The component class is instantiated, so the component code is executed,
  not just the executor code.

  To pass artifact URI, use <input_name>_uri argument name.
  To pass artifact property, use <input_name>_<property> argument name.
  Protobuf property values can be passed as JSON-serialized protobufs.

  # pylint: disable=line-too-long

  Example::

    # When run as a script:
    python3 scripts/run_component.py \
      --full-component-class-name tfx.components.StatisticsGen \
      --examples-uri gs://my_bucket/chicago_taxi_simple/CsvExamplesGen/examples/1/ \
      --examples-split-names '["train", "eval"]' \
      --output-uri gs://my_bucket/chicago_taxi_simple/StatisticsGen/output/1/

    # When run as a function:
    run_component(
      full_component_class_name='tfx.components.StatisticsGen',
      examples_uri='gs://my_bucket/chicago_taxi_simple/CsvExamplesGen/sxamples/1/',
      examples_split_names='["train", "eval"]',
      output_uri='gs://my_bucket/chicago_taxi_simple/StatisticsGen/output/1/',
    )

  Args:
    full_component_class_name: The component class name including module name.
    temp_directory_path: Optional. Temporary directory path for the executor.
    beam_pipeline_args: Optional. Arguments to pass to the Beam pipeline.
    **arguments: Key-value pairs with component arguments.
  """
  component_class = import_utils.import_class_by_path(full_component_class_name)

  component_arguments = {}

  for name, execution_param in component_class.SPEC_CLASS.PARAMETERS.items():
    argument_value = arguments.get(name, None)
    if argument_value is None:
      continue
    param_type = execution_param.type
    if (isinstance(param_type, type) and
        issubclass(param_type, message.Message)):
      argument_value_obj = param_type()
      proto_utils.json_to_proto(argument_value, argument_value_obj)
    elif param_type is int:
      argument_value_obj = int(argument_value)
    elif param_type is float:
      argument_value_obj = float(argument_value)
    else:
      argument_value_obj = argument_value
    component_arguments[name] = argument_value_obj

  for input_name, channel_param in component_class.SPEC_CLASS.INPUTS.items():
    uri = (arguments.get(input_name + '_uri') or
           arguments.get(input_name + '_path'))
    if uri:
      artifact = channel_param.type()
      artifact.uri = uri
      # Setting the artifact properties
      for property_name, property_spec in (channel_param.type.PROPERTIES or
                                           {}).items():
        property_arg_name = input_name + '_' + property_name
        if property_arg_name in arguments:
          property_value = arguments[property_arg_name]
          if property_spec.type == PropertyType.INT:
            property_value = int(property_value)
          if property_spec.type == PropertyType.FLOAT:
            property_value = float(property_value)
          setattr(artifact, property_name, property_value)
      component_arguments[input_name] = channel_utils.as_channel([artifact])

  component_instance = component_class(**component_arguments)

  input_dict = channel_utils.unwrap_channel_dict(component_instance.inputs)
  output_dict = channel_utils.unwrap_channel_dict(component_instance.outputs)
  exec_properties = component_instance.exec_properties

  # Generating paths for output artifacts
  for output_name, channel_param in component_class.SPEC_CLASS.OUTPUTS.items():
    uri = (arguments.get('output_' + output_name + '_uri') or
           arguments.get(output_name + '_uri') or
           arguments.get(output_name + '_path'))
    if uri:
      artifacts = output_dict[output_name]
      if not artifacts:
        artifacts.append(channel_param.type())
      for artifact in artifacts:
        artifact.uri = uri

  if issubclass(component_instance.executor_spec.executor_class,
                base_beam_executor.BaseBeamExecutor):
    executor_context = base_beam_executor.BaseBeamExecutor.Context(
        beam_pipeline_args=beam_pipeline_args,
        tmp_dir=temp_directory_path,
        unique_id='',
    )
  else:
    executor_context = base_executor.BaseExecutor.Context(
        extra_flags=beam_pipeline_args,
        tmp_dir=temp_directory_path,
        unique_id='',
    )
  executor = component_instance.executor_spec.executor_class(executor_context)
  executor.Do(
      input_dict=input_dict,
      output_dict=output_dict,
      exec_properties=exec_properties,
  )

  # Writing out the output artifact properties
  for output_name, channel_param in component_class.SPEC_CLASS.OUTPUTS.items():
    for property_name in channel_param.type.PROPERTIES or []:
      property_path_arg_name = output_name + '_' + property_name + '_path'
      property_path = arguments.get(property_path_arg_name)
      if property_path:
        artifacts = output_dict[output_name]
        for artifact in artifacts:
          property_value = getattr(artifact, property_name)
          os.makedirs(os.path.dirname(property_path), exist_ok=True)
          with open(property_path, 'w') as f:
            f.write(str(property_value))