in tfx/components/transform/executor.py [0:0]
def Transform(self, inputs: Mapping[str, Any], outputs: Mapping[str, Any],
status_file: Optional[str] = None) -> None:
"""Executes on request.
This is the implementation part of transform executor. This is intended for
using or extending the executor without artifact dependency.
Args:
inputs: A dictionary of labelled input values, including:
- labels.DISABLE_STATISTICS_LABEL: Whether disable statistics
compuatation.
- labels.SCHEMA_PATH_LABEL: Path to schema file.
- labels.EXAMPLES_DATA_FORMAT_LABEL: Example data format, one of the
enums from example_gen_pb2.PayloadFormat.
- labels.ANALYZE_DATA_PATHS_LABEL: Paths or path patterns to analyze
data.
- labels.ANALYZE_PATHS_FILE_FORMATS_LABEL: File formats of paths to
analyze data.
- labels.TRANSFORM_DATA_PATHS_LABEL: Paths or path patterns to transform
data.
- labels.TRANSFORM_PATHS_FILE_FORMATS_LABEL: File formats of paths to
transform data.
- labels.PREPROCESSING_FN: Python function that implements
preprocessing_fn.
- labels.STATS_OPTIONS_UPDATER_FN: Python function that implements
stats_options_updater_fn, optional.
- labels.MAKE_BEAM_PIPELINE_FN: Python function that makes a Beam
pipeline object.
- labels.DATA_VIEW_LABEL: DataView to be used to read the Example,
optional
- labels.FORCE_TF_COMPAT_V1_LABEL: Whether to use TF in compat.v1 mode
irrespective of installed/enabled TF behaviors.
outputs: A dictionary of labelled output values, including:
- labels.PER_SET_STATS_OUTPUT_PATHS_LABEL: Paths to statistics output,
optional.
- labels.TRANSFORM_METADATA_OUTPUT_PATH_LABEL: A path to
TFTransformOutput output.
- labels.TRANSFORM_MATERIALIZE_OUTPUT_PATHS_LABEL: Paths to transform
materialization.
- labels.TEMP_OUTPUT_LABEL: A path to temporary directory.
- labels.PRE_TRANSFORM_OUTPUT_SCHEMA_PATH_LABEL: A path to the output
pre-transform schema, optional.
- labels.PRE_TRANSFORM_OUTPUT_STATS_PATH_LABEL: A path to the output
pre-transform statistics, optional.
- labels.POST_TRANSFORM_OUTPUT_SCHEMA_PATH_LABEL: A path to the output
post-transform schema, optional.
- labels.POST_TRANSFORM_OUTPUT_STATS_PATH_LABEL: A path to the output
post-transform statistics, optional.
- labels.POST_TRANSFORM_OUTPUT_ANOMALIES_PATH_LABEL: A path to the
output post-transform anomalies, optional.
status_file: Where the status should be written (not yet implemented)
"""
del status_file # unused
logging.debug('Inputs to executor.Transform function: %s', inputs)
logging.debug('Outputs to executor.Transform function: %s', outputs)
disable_statistics = value_utils.GetSoleValue(
inputs, labels.DISABLE_STATISTICS_LABEL)
transform_output_path = value_utils.GetSoleValue(
outputs, labels.TRANSFORM_METADATA_OUTPUT_PATH_LABEL)
raw_examples_data_format = value_utils.GetSoleValue(
inputs, labels.EXAMPLES_DATA_FORMAT_LABEL)
schema = value_utils.GetSoleValue(inputs, labels.SCHEMA_PATH_LABEL)
input_dataset_metadata = self._ReadMetadata(raw_examples_data_format,
schema)
materialize_output_paths = value_utils.GetValues(
outputs, labels.TRANSFORM_MATERIALIZE_OUTPUT_PATHS_LABEL)
preprocessing_fn = inputs[labels.PREPROCESSING_FN]
stats_options_updater_fn = inputs.get(labels.STATS_OPTIONS_UPDATER_FN)
make_beam_pipeline_fn = inputs[labels.MAKE_BEAM_PIPELINE_FN]
analyze_data_paths = value_utils.GetValues(inputs,
labels.ANALYZE_DATA_PATHS_LABEL)
analyze_paths_file_formats = value_utils.GetValues(
inputs, labels.ANALYZE_PATHS_FILE_FORMATS_LABEL)
transform_data_paths = value_utils.GetValues(
inputs, labels.TRANSFORM_DATA_PATHS_LABEL)
transform_paths_file_formats = value_utils.GetValues(
inputs, labels.TRANSFORM_PATHS_FILE_FORMATS_LABEL)
input_cache_dir = value_utils.GetSoleValue(
inputs, labels.CACHE_INPUT_PATH_LABEL, strict=False)
output_cache_dir = value_utils.GetSoleValue(
outputs, labels.CACHE_OUTPUT_PATH_LABEL, strict=False)
per_set_stats_output_paths = value_utils.GetValues(
outputs, labels.PER_SET_STATS_OUTPUT_PATHS_LABEL)
temp_path = value_utils.GetSoleValue(outputs, labels.TEMP_OUTPUT_LABEL)
data_view_uri = value_utils.GetSoleValue(
inputs, labels.DATA_VIEW_LABEL, strict=False)
force_tf_compat_v1 = value_utils.GetSoleValue(
inputs, labels.FORCE_TF_COMPAT_V1_LABEL)
stats_labels_list = [
labels.PRE_TRANSFORM_OUTPUT_STATS_PATH_LABEL,
labels.PRE_TRANSFORM_OUTPUT_SCHEMA_PATH_LABEL,
labels.POST_TRANSFORM_OUTPUT_ANOMALIES_PATH_LABEL,
labels.POST_TRANSFORM_OUTPUT_STATS_PATH_LABEL,
labels.POST_TRANSFORM_OUTPUT_SCHEMA_PATH_LABEL
]
stats_output_paths = {}
for label in stats_labels_list:
value = value_utils.GetSoleValue(outputs, label, strict=False)
if value:
stats_output_paths[label] = value
if stats_output_paths and len(stats_output_paths) != len(stats_labels_list):
raise ValueError('Either all stats_output_paths should be'
' specified or none.')
logging.debug('Force tf.compat.v1: %s', force_tf_compat_v1)
logging.debug('Analyze data patterns: %s',
list(enumerate(analyze_data_paths)))
logging.debug('Transform data patterns: %s',
list(enumerate(transform_data_paths)))
logging.debug('Transform materialization output paths: %s',
list(enumerate(materialize_output_paths)))
logging.debug('Transform output path: %s', transform_output_path)
if len(analyze_data_paths) != len(analyze_paths_file_formats):
raise ValueError(
'size of analyze_data_paths and '
'analyze_paths_file_formats do not match: {} v.s {}'.format(
len(analyze_data_paths), len(analyze_paths_file_formats)))
if len(transform_data_paths) != len(transform_paths_file_formats):
raise ValueError(
'size of transform_data_paths and '
'transform_paths_file_formats do not match: {} v.s {}'.format(
len(transform_data_paths), len(transform_paths_file_formats)))
can_process_analysis_jointly = not bool(output_cache_dir)
analyze_data_list = self._MakeDatasetList(analyze_data_paths,
analyze_paths_file_formats,
raw_examples_data_format,
data_view_uri,
can_process_analysis_jointly)
if not analyze_data_list:
raise ValueError('Analyze data list must not be empty.')
can_process_transform_jointly = not bool(per_set_stats_output_paths or
materialize_output_paths)
transform_data_list = self._MakeDatasetList(transform_data_paths,
transform_paths_file_formats,
raw_examples_data_format,
data_view_uri,
can_process_transform_jointly,
per_set_stats_output_paths,
materialize_output_paths)
all_datasets = analyze_data_list + transform_data_list
for d in all_datasets:
d.tfxio = self._CreateTFXIO(d, input_dataset_metadata.schema)
self._AssertSameTFXIOSchema(all_datasets)
typespecs = all_datasets[0].tfxio.TensorAdapter().OriginalTypeSpecs()
# Inspecting the preprocessing_fn even if we know we need a full pass in
# order to fail faster if it fails.
analyze_input_columns = tft.get_analyze_input_columns(
preprocessing_fn, typespecs, force_tf_compat_v1=force_tf_compat_v1)
if (disable_statistics and not materialize_output_paths and
stats_options_updater_fn is None):
if analyze_input_columns:
logging.warning(
'Not using the in-place Transform because the following features '
'require analyzing: %s', tuple(c for c in analyze_input_columns))
else:
logging.warning(
'Using the in-place Transform since disable_statistics=True, '
'it does not materialize transformed data, and the configured '
'preprocessing_fn appears to not require analyzing the data.')
self._RunInPlaceImpl(preprocessing_fn, force_tf_compat_v1,
input_dataset_metadata, typespecs,
transform_output_path)
# TODO(b/122478841): Writes status to status file.
return
stats_options_updater_fn = (stats_options_updater_fn
if stats_options_updater_fn else lambda _, x: x)
materialization_format = (
transform_paths_file_formats[-1] if materialize_output_paths else None)
self._RunBeamImpl(analyze_data_list, transform_data_list, preprocessing_fn,
stats_options_updater_fn, force_tf_compat_v1,
input_dataset_metadata, transform_output_path,
raw_examples_data_format, temp_path, input_cache_dir,
output_cache_dir, disable_statistics,
per_set_stats_output_paths, materialization_format,
len(analyze_data_paths), stats_output_paths,
make_beam_pipeline_fn)