def Do()

in tfx/components/transform/executor.py [0:0]


  def Do(self, input_dict: Dict[str, List[types.Artifact]],
         output_dict: Dict[str, List[types.Artifact]],
         exec_properties: Dict[str, Any]) -> None:
    """TensorFlow Transform executor entrypoint.

    This implements BaseExecutor.Do() and is invoked by orchestration systems.
    This is not inteded for manual usage or further customization. Please use
    the Transform() function which takes an input format with no artifact
    dependency.

    Args:
      input_dict: Input dict from input key to a list of artifacts, including:
        - examples: A list of type `standard_artifacts.Examples` which should
          contain custom splits specified in splits_config. If custom split is
          not provided, this should contain two splits 'train' and 'eval'.
        - schema: A list of type `standard_artifacts.Schema` which should
          contain a single schema artifact.
        - analyzer_cache: Cache input of 'tf.Transform', where cached
          information for analyzed examples from previous runs will be read.
      output_dict: Output dict from key to a list of artifacts, including:
        - transform_graph: Output of 'tf.Transform', which includes an exported
          Tensorflow graph suitable for both training and serving;
        - transformed_examples: Materialized transformed examples, which
          includes transform splits as specified in splits_config. If custom
          split is not provided, this should include both 'train' and 'eval'
          splits.
        - updated_analyzer_cache: Cache output of 'tf.Transform', where
          cached information for analyzed examples will be written.
      exec_properties: A dict of execution properties, including:
        - module_file: The file path to a python module file, from which the
          'preprocessing_fn' function will be loaded. Exactly one of
          'module_file', 'module_path' and 'preprocessing_fn' should be set.
        - module_path: The python module path, from which the
          'preprocessing_fn' function will be loaded. Exactly one of
          'module_file', 'module_path' and 'preprocessing_fn' should be set.
        - preprocessing_fn: The module path to a python function that
          implements 'preprocessing_fn'. Exactly one of 'module_file',
          'module_path' and 'preprocessing_fn' should be set.
        - 'stats_options_updater_fn': The module path to a python function that
          implements 'stats_options_updater_fn'. This cannot be specified
          together with 'module_file'.
        - splits_config: A transform_pb2.SplitsConfig instance, providing splits
          that should be analyzed and splits that should be transformed. Note
          analyze and transform splits can have overlap. Default behavior (when
          splits_config is not set) is analyze the 'train' split and transform
          all splits. If splits_config is set, analyze cannot be empty.
        - force_tf_compat_v1: Whether to use TF in compat.v1 mode
          irrespective of installed/enabled TF behaviors.
        - disable_statistics: Whether to disable computation of pre-transform
          and post-transform statistics.

    Returns:
      None
    """
    self._log_startup(input_dict, output_dict, exec_properties)

    executor_utils.MatchNumberOfTransformedExamplesArtifacts(
        input_dict, output_dict)

    splits_config = executor_utils.ResolveSplitsConfig(
        exec_properties.get(standard_component_specs.SPLITS_CONFIG_KEY),
        input_dict[standard_component_specs.EXAMPLES_KEY])

    payload_format, data_view_uri = (
        tfxio_utils.resolve_payload_format_and_data_view_uri(
            input_dict[standard_component_specs.EXAMPLES_KEY]))
    examples_file_formats = [
        examples_utils.get_file_format(artifact)
        for artifact in input_dict[standard_component_specs.EXAMPLES_KEY]
    ]
    schema_file = io_utils.get_only_uri_in_dir(
        artifact_utils.get_single_uri(
            input_dict[standard_component_specs.SCHEMA_KEY]))
    transform_output = artifact_utils.get_single_uri(
        output_dict[standard_component_specs.TRANSFORM_GRAPH_KEY])

    disable_statistics = bool(
        exec_properties.get(standard_component_specs.DISABLE_STATISTICS_KEY, 0))
    stats_output_paths = executor_utils.GetStatsOutputPathEntries(
        disable_statistics, output_dict)

    temp_path = os.path.join(transform_output, _TEMP_DIR_IN_TRANSFORM_OUTPUT)
    logging.debug('Using temp path %s for tft.beam', temp_path)

    analyze_data_paths = []
    analyze_file_formats = []
    for split in splits_config.analyze:
      data_uris = artifact_utils.get_split_uris(
          input_dict[standard_component_specs.EXAMPLES_KEY], split)
      assert len(data_uris) == len(
          examples_file_formats), 'Length of file formats is different'
      for data_uri, file_format in zip(data_uris, examples_file_formats):
        analyze_data_paths.append(io_utils.all_files_pattern(data_uri))
        analyze_file_formats.append(file_format)

    transform_data_paths = []
    transform_file_formats = []
    for split in splits_config.transform:
      data_uris = artifact_utils.get_split_uris(
          input_dict[standard_component_specs.EXAMPLES_KEY], split)
      assert len(data_uris) == len(
          examples_file_formats), 'Length of file formats is different'
      for data_uri, file_format in zip(data_uris, examples_file_formats):
        transform_data_paths.append(io_utils.all_files_pattern(data_uri))
        transform_file_formats.append(file_format)

    transformed_examples = output_dict.get(
        standard_component_specs.TRANSFORMED_EXAMPLES_KEY)
    executor_utils.SetSplitNames(splits_config.transform, transformed_examples)
    materialize_output_paths = executor_utils.GetSplitPaths(
        transformed_examples)

    force_tf_compat_v1 = bool(
        exec_properties.get(standard_component_specs.FORCE_TF_COMPAT_V1_KEY, 0))

    # Make sure user packages get propagated to the remote Beam worker.
    user_module_key = exec_properties.get(
        standard_component_specs.MODULE_PATH_KEY, None)
    _, extra_pip_packages = udf_utils.decode_user_module_key(user_module_key)
    for pip_package_path in extra_pip_packages:
      local_pip_package_path = io_utils.ensure_local(pip_package_path)
      self._beam_pipeline_args.append(_BEAM_EXTRA_PACKAGE_PREFIX +
                                      local_pip_package_path)
      self._pip_dependencies.append(local_pip_package_path)

    inputs_for_fn_resolution = {
        labels.MODULE_FILE:
            exec_properties.get(standard_component_specs.MODULE_FILE_KEY, None),
        labels.MODULE_PATH:
            user_module_key,
        labels.PREPROCESSING_FN:
            exec_properties.get(standard_component_specs.PREPROCESSING_FN_KEY,
                                None),
        labels.STATS_OPTIONS_UPDATER_FN:
            exec_properties.get(
                standard_component_specs.STATS_OPTIONS_UPDATER_FN_KEY, None),
        labels.CUSTOM_CONFIG:
            exec_properties.get(standard_component_specs.CUSTOM_CONFIG_KEY,
                                None),
        # Used in nitroml/automl/autodata/transform/executor.py
        labels.SCHEMA_PATH_LABEL:
            schema_file,
    }
    # Used in nitroml/automl/autodata/transform/executor.py
    outputs_for_fn_resolution = {
        labels.TRANSFORM_METADATA_OUTPUT_PATH_LABEL: transform_output,
    }
    # TODO(b/178065215): Refactor to pass exec_properties directly.
    #                    We need to change usages in nitroml, too.
    preprocessing_fn = self._GetPreprocessingFn(inputs_for_fn_resolution,
                                                outputs_for_fn_resolution)
    stats_options_updater_fn = self._GetStatsOptionsUpdaterFn(
        inputs_for_fn_resolution)

    label_inputs = {
        labels.DISABLE_STATISTICS_LABEL:
            disable_statistics,
        labels.SCHEMA_PATH_LABEL:
            schema_file,
        labels.EXAMPLES_DATA_FORMAT_LABEL:
            payload_format,
        labels.DATA_VIEW_LABEL:
            data_view_uri,
        labels.ANALYZE_DATA_PATHS_LABEL:
            analyze_data_paths,
        labels.ANALYZE_PATHS_FILE_FORMATS_LABEL:
            analyze_file_formats,
        labels.TRANSFORM_DATA_PATHS_LABEL:
            transform_data_paths,
        labels.TRANSFORM_PATHS_FILE_FORMATS_LABEL:
            transform_file_formats,
        labels.PREPROCESSING_FN:
            preprocessing_fn,
        labels.STATS_OPTIONS_UPDATER_FN:
            stats_options_updater_fn,
        labels.MAKE_BEAM_PIPELINE_FN:
            self._make_beam_pipeline,
        labels.FORCE_TF_COMPAT_V1_LABEL:
            force_tf_compat_v1,
        **executor_utils.GetCachePathEntry(
            standard_component_specs.ANALYZER_CACHE_KEY, input_dict)
    }

    label_outputs = {
        labels.TRANSFORM_METADATA_OUTPUT_PATH_LABEL:
            transform_output,
        labels.TRANSFORM_MATERIALIZE_OUTPUT_PATHS_LABEL:
            materialize_output_paths,
        labels.TEMP_OUTPUT_LABEL:
            str(temp_path),
        **stats_output_paths,
        **executor_utils.GetCachePathEntry(
            standard_component_specs.UPDATED_ANALYZER_CACHE_KEY, output_dict),
    }

    status_file = 'status_file'  # Unused

    # TempPipInstallContext is needed here so that subprocesses (which
    # may be created by the Beam multi-process DirectRunner) can find the
    # needed dependencies.
    # TODO(b/187122662): Move this to the ExecutorOperator or Launcher and
    # remove the `_pip_dependencies` attribute.
    with udf_utils.TempPipInstallContext(self._pip_dependencies):
      TransformProcessor().Transform(label_inputs, label_outputs, status_file)
    logging.debug('Cleaning up temp path %s on executor success', temp_path)
    io_utils.delete_dir(temp_path)