in tfx/orchestration/kubeflow/v2/file_based_example_gen/driver.py [0:0]
def _run_driver(executor_input: pipeline_spec_pb2.ExecutorInput) -> None:
"""Runs the driver, writing its output as a ExecutorOutput proto.
The main goal of this driver is to calculate the span and fingerprint of input
data, allowing for the executor invocation to be skipped if the ExampleGen
component has been previously run on the same data with the same
configuration. This span and fingerprint are added as new custom execution
properties to an ExecutorOutput proto and written to a GCS path. The CAIP
pipelines system reads this file and updates MLMD with the new execution
properties.
Args:
executor_input: pipeline_spec_pb2.ExecutorInput that contains TFX artifacts
and exec_properties information.
"""
exec_properties = kubeflow_v2_entrypoint_utils.parse_execution_properties(
executor_input.inputs.parameters)
name_from_id = {}
outputs_dict = kubeflow_v2_entrypoint_utils.parse_raw_artifact_dict(
executor_input.outputs.artifacts, name_from_id)
# A path at which an ExecutorOutput message will be
# written with updated execution properties and output artifacts. The CAIP
# Pipelines service will update the task's properties and artifacts prior to
# running the executor.
output_metadata_uri = executor_input.outputs.output_file
logging.set_verbosity(logging.INFO)
logging.info('exec_properties = %s\noutput_metadata_uri = %s',
exec_properties, output_metadata_uri)
input_base_uri = exec_properties.get(standard_component_specs.INPUT_BASE_KEY)
input_config = example_gen_pb2.Input()
proto_utils.json_to_proto(
exec_properties[standard_component_specs.INPUT_CONFIG_KEY], input_config)
range_config = None
range_config_entry = exec_properties.get(
standard_component_specs.RANGE_CONFIG_KEY)
if range_config_entry:
range_config = range_config_pb2.RangeConfig()
proto_utils.json_to_proto(range_config_entry, range_config)
processor = input_processor.FileBasedInputProcessor(input_base_uri,
input_config.splits,
range_config)
span, version = processor.resolve_span_and_version()
fingerprint = processor.get_input_fingerprint(span, version)
logging.info('Calculated span: %s', span)
logging.info('Calculated fingerprint: %s', fingerprint)
exec_properties[utils.SPAN_PROPERTY_NAME] = span
exec_properties[utils.FINGERPRINT_PROPERTY_NAME] = fingerprint
exec_properties[utils.VERSION_PROPERTY_NAME] = version
# Updates the input_config.splits.pattern.
for split in input_config.splits:
split.pattern = processor.get_pattern_for_span_version(
split.pattern, span, version)
exec_properties[standard_component_specs
.INPUT_CONFIG_KEY] = proto_utils.proto_to_json(input_config)
if standard_component_specs.EXAMPLES_KEY not in outputs_dict:
raise ValueError('Example artifact was missing in the ExampleGen outputs.')
example_artifact = artifact_utils.get_single_instance(
outputs_dict[standard_component_specs.EXAMPLES_KEY])
driver.update_output_artifact(
exec_properties=exec_properties,
output_artifact=example_artifact.mlmd_artifact)
# Log the output metadata file
output_metadata = pipeline_spec_pb2.ExecutorOutput()
output_metadata.parameters[utils.SPAN_PROPERTY_NAME].int_value = span
output_metadata.parameters[
utils.FINGERPRINT_PROPERTY_NAME].string_value = fingerprint
if version is not None:
output_metadata.parameters[utils.VERSION_PROPERTY_NAME].int_value = version
output_metadata.parameters[
standard_component_specs
.INPUT_CONFIG_KEY].string_value = proto_utils.proto_to_json(input_config)
output_metadata.artifacts[
standard_component_specs.EXAMPLES_KEY].artifacts.add().CopyFrom(
kubeflow_v2_entrypoint_utils.to_runtime_artifact(
example_artifact, name_from_id))
fileio.makedirs(os.path.dirname(output_metadata_uri))
with fileio.open(output_metadata_uri, 'wb') as f:
f.write(json_format.MessageToJson(output_metadata, sort_keys=True))