in tfx/components/transform/executor.py [0:0]
def Do(self, input_dict: Dict[str, List[types.Artifact]],
output_dict: Dict[str, List[types.Artifact]],
exec_properties: Dict[str, Any]) -> None:
"""TensorFlow Transform executor entrypoint.
This implements BaseExecutor.Do() and is invoked by orchestration systems.
This is not inteded for manual usage or further customization. Please use
the Transform() function which takes an input format with no artifact
dependency.
Args:
input_dict: Input dict from input key to a list of artifacts, including:
- examples: A list of type `standard_artifacts.Examples` which should
contain custom splits specified in splits_config. If custom split is
not provided, this should contain two splits 'train' and 'eval'.
- schema: A list of type `standard_artifacts.Schema` which should
contain a single schema artifact.
- analyzer_cache: Cache input of 'tf.Transform', where cached
information for analyzed examples from previous runs will be read.
output_dict: Output dict from key to a list of artifacts, including:
- transform_graph: Output of 'tf.Transform', which includes an exported
Tensorflow graph suitable for both training and serving;
- transformed_examples: Materialized transformed examples, which
includes transform splits as specified in splits_config. If custom
split is not provided, this should include both 'train' and 'eval'
splits.
- updated_analyzer_cache: Cache output of 'tf.Transform', where
cached information for analyzed examples will be written.
exec_properties: A dict of execution properties, including:
- module_file: The file path to a python module file, from which the
'preprocessing_fn' function will be loaded. Exactly one of
'module_file', 'module_path' and 'preprocessing_fn' should be set.
- module_path: The python module path, from which the
'preprocessing_fn' function will be loaded. Exactly one of
'module_file', 'module_path' and 'preprocessing_fn' should be set.
- preprocessing_fn: The module path to a python function that
implements 'preprocessing_fn'. Exactly one of 'module_file',
'module_path' and 'preprocessing_fn' should be set.
- 'stats_options_updater_fn': The module path to a python function that
implements 'stats_options_updater_fn'. This cannot be specified
together with 'module_file'.
- splits_config: A transform_pb2.SplitsConfig instance, providing splits
that should be analyzed and splits that should be transformed. Note
analyze and transform splits can have overlap. Default behavior (when
splits_config is not set) is analyze the 'train' split and transform
all splits. If splits_config is set, analyze cannot be empty.
- force_tf_compat_v1: Whether to use TF in compat.v1 mode
irrespective of installed/enabled TF behaviors.
- disable_statistics: Whether to disable computation of pre-transform
and post-transform statistics.
Returns:
None
"""
self._log_startup(input_dict, output_dict, exec_properties)
executor_utils.MatchNumberOfTransformedExamplesArtifacts(
input_dict, output_dict)
splits_config = executor_utils.ResolveSplitsConfig(
exec_properties.get(standard_component_specs.SPLITS_CONFIG_KEY),
input_dict[standard_component_specs.EXAMPLES_KEY])
payload_format, data_view_uri = (
tfxio_utils.resolve_payload_format_and_data_view_uri(
input_dict[standard_component_specs.EXAMPLES_KEY]))
examples_file_formats = [
examples_utils.get_file_format(artifact)
for artifact in input_dict[standard_component_specs.EXAMPLES_KEY]
]
schema_file = io_utils.get_only_uri_in_dir(
artifact_utils.get_single_uri(
input_dict[standard_component_specs.SCHEMA_KEY]))
transform_output = artifact_utils.get_single_uri(
output_dict[standard_component_specs.TRANSFORM_GRAPH_KEY])
disable_statistics = bool(
exec_properties.get(standard_component_specs.DISABLE_STATISTICS_KEY, 0))
stats_output_paths = executor_utils.GetStatsOutputPathEntries(
disable_statistics, output_dict)
temp_path = os.path.join(transform_output, _TEMP_DIR_IN_TRANSFORM_OUTPUT)
logging.debug('Using temp path %s for tft.beam', temp_path)
analyze_data_paths = []
analyze_file_formats = []
for split in splits_config.analyze:
data_uris = artifact_utils.get_split_uris(
input_dict[standard_component_specs.EXAMPLES_KEY], split)
assert len(data_uris) == len(
examples_file_formats), 'Length of file formats is different'
for data_uri, file_format in zip(data_uris, examples_file_formats):
analyze_data_paths.append(io_utils.all_files_pattern(data_uri))
analyze_file_formats.append(file_format)
transform_data_paths = []
transform_file_formats = []
for split in splits_config.transform:
data_uris = artifact_utils.get_split_uris(
input_dict[standard_component_specs.EXAMPLES_KEY], split)
assert len(data_uris) == len(
examples_file_formats), 'Length of file formats is different'
for data_uri, file_format in zip(data_uris, examples_file_formats):
transform_data_paths.append(io_utils.all_files_pattern(data_uri))
transform_file_formats.append(file_format)
transformed_examples = output_dict.get(
standard_component_specs.TRANSFORMED_EXAMPLES_KEY)
executor_utils.SetSplitNames(splits_config.transform, transformed_examples)
materialize_output_paths = executor_utils.GetSplitPaths(
transformed_examples)
force_tf_compat_v1 = bool(
exec_properties.get(standard_component_specs.FORCE_TF_COMPAT_V1_KEY, 0))
# Make sure user packages get propagated to the remote Beam worker.
user_module_key = exec_properties.get(
standard_component_specs.MODULE_PATH_KEY, None)
_, extra_pip_packages = udf_utils.decode_user_module_key(user_module_key)
for pip_package_path in extra_pip_packages:
local_pip_package_path = io_utils.ensure_local(pip_package_path)
self._beam_pipeline_args.append(_BEAM_EXTRA_PACKAGE_PREFIX +
local_pip_package_path)
self._pip_dependencies.append(local_pip_package_path)
inputs_for_fn_resolution = {
labels.MODULE_FILE:
exec_properties.get(standard_component_specs.MODULE_FILE_KEY, None),
labels.MODULE_PATH:
user_module_key,
labels.PREPROCESSING_FN:
exec_properties.get(standard_component_specs.PREPROCESSING_FN_KEY,
None),
labels.STATS_OPTIONS_UPDATER_FN:
exec_properties.get(
standard_component_specs.STATS_OPTIONS_UPDATER_FN_KEY, None),
labels.CUSTOM_CONFIG:
exec_properties.get(standard_component_specs.CUSTOM_CONFIG_KEY,
None),
# Used in nitroml/automl/autodata/transform/executor.py
labels.SCHEMA_PATH_LABEL:
schema_file,
}
# Used in nitroml/automl/autodata/transform/executor.py
outputs_for_fn_resolution = {
labels.TRANSFORM_METADATA_OUTPUT_PATH_LABEL: transform_output,
}
# TODO(b/178065215): Refactor to pass exec_properties directly.
# We need to change usages in nitroml, too.
preprocessing_fn = self._GetPreprocessingFn(inputs_for_fn_resolution,
outputs_for_fn_resolution)
stats_options_updater_fn = self._GetStatsOptionsUpdaterFn(
inputs_for_fn_resolution)
label_inputs = {
labels.DISABLE_STATISTICS_LABEL:
disable_statistics,
labels.SCHEMA_PATH_LABEL:
schema_file,
labels.EXAMPLES_DATA_FORMAT_LABEL:
payload_format,
labels.DATA_VIEW_LABEL:
data_view_uri,
labels.ANALYZE_DATA_PATHS_LABEL:
analyze_data_paths,
labels.ANALYZE_PATHS_FILE_FORMATS_LABEL:
analyze_file_formats,
labels.TRANSFORM_DATA_PATHS_LABEL:
transform_data_paths,
labels.TRANSFORM_PATHS_FILE_FORMATS_LABEL:
transform_file_formats,
labels.PREPROCESSING_FN:
preprocessing_fn,
labels.STATS_OPTIONS_UPDATER_FN:
stats_options_updater_fn,
labels.MAKE_BEAM_PIPELINE_FN:
self._make_beam_pipeline,
labels.FORCE_TF_COMPAT_V1_LABEL:
force_tf_compat_v1,
**executor_utils.GetCachePathEntry(
standard_component_specs.ANALYZER_CACHE_KEY, input_dict)
}
label_outputs = {
labels.TRANSFORM_METADATA_OUTPUT_PATH_LABEL:
transform_output,
labels.TRANSFORM_MATERIALIZE_OUTPUT_PATHS_LABEL:
materialize_output_paths,
labels.TEMP_OUTPUT_LABEL:
str(temp_path),
**stats_output_paths,
**executor_utils.GetCachePathEntry(
standard_component_specs.UPDATED_ANALYZER_CACHE_KEY, output_dict),
}
status_file = 'status_file' # Unused
# TempPipInstallContext is needed here so that subprocesses (which
# may be created by the Beam multi-process DirectRunner) can find the
# needed dependencies.
# TODO(b/187122662): Move this to the ExecutorOperator or Launcher and
# remove the `_pip_dependencies` attribute.
with udf_utils.TempPipInstallContext(self._pip_dependencies):
TransformProcessor().Transform(label_inputs, label_outputs, status_file)
logging.debug('Cleaning up temp path %s on executor success', temp_path)
io_utils.delete_dir(temp_path)