in tfx/scripts/run_component.py [0:0]
def run_component(full_component_class_name: str,
temp_directory_path: Optional[str] = None,
beam_pipeline_args: Optional[List[str]] = None,
**arguments):
r"""Loads a component, instantiates it with arguments and runs its executor.
The component class is instantiated, so the component code is executed,
not just the executor code.
To pass artifact URI, use <input_name>_uri argument name.
To pass artifact property, use <input_name>_<property> argument name.
Protobuf property values can be passed as JSON-serialized protobufs.
# pylint: disable=line-too-long
Example::
# When run as a script:
python3 scripts/run_component.py \
--full-component-class-name tfx.components.StatisticsGen \
--examples-uri gs://my_bucket/chicago_taxi_simple/CsvExamplesGen/examples/1/ \
--examples-split-names '["train", "eval"]' \
--output-uri gs://my_bucket/chicago_taxi_simple/StatisticsGen/output/1/
# When run as a function:
run_component(
full_component_class_name='tfx.components.StatisticsGen',
examples_uri='gs://my_bucket/chicago_taxi_simple/CsvExamplesGen/sxamples/1/',
examples_split_names='["train", "eval"]',
output_uri='gs://my_bucket/chicago_taxi_simple/StatisticsGen/output/1/',
)
Args:
full_component_class_name: The component class name including module name.
temp_directory_path: Optional. Temporary directory path for the executor.
beam_pipeline_args: Optional. Arguments to pass to the Beam pipeline.
**arguments: Key-value pairs with component arguments.
"""
component_class = import_utils.import_class_by_path(full_component_class_name)
component_arguments = {}
for name, execution_param in component_class.SPEC_CLASS.PARAMETERS.items():
argument_value = arguments.get(name, None)
if argument_value is None:
continue
param_type = execution_param.type
if (isinstance(param_type, type) and
issubclass(param_type, message.Message)):
argument_value_obj = param_type()
proto_utils.json_to_proto(argument_value, argument_value_obj)
elif param_type is int:
argument_value_obj = int(argument_value)
elif param_type is float:
argument_value_obj = float(argument_value)
else:
argument_value_obj = argument_value
component_arguments[name] = argument_value_obj
for input_name, channel_param in component_class.SPEC_CLASS.INPUTS.items():
uri = (arguments.get(input_name + '_uri') or
arguments.get(input_name + '_path'))
if uri:
artifact = channel_param.type()
artifact.uri = uri
# Setting the artifact properties
for property_name, property_spec in (channel_param.type.PROPERTIES or
{}).items():
property_arg_name = input_name + '_' + property_name
if property_arg_name in arguments:
property_value = arguments[property_arg_name]
if property_spec.type == PropertyType.INT:
property_value = int(property_value)
if property_spec.type == PropertyType.FLOAT:
property_value = float(property_value)
setattr(artifact, property_name, property_value)
component_arguments[input_name] = channel_utils.as_channel([artifact])
component_instance = component_class(**component_arguments)
input_dict = channel_utils.unwrap_channel_dict(component_instance.inputs)
output_dict = channel_utils.unwrap_channel_dict(component_instance.outputs)
exec_properties = component_instance.exec_properties
# Generating paths for output artifacts
for output_name, channel_param in component_class.SPEC_CLASS.OUTPUTS.items():
uri = (arguments.get('output_' + output_name + '_uri') or
arguments.get(output_name + '_uri') or
arguments.get(output_name + '_path'))
if uri:
artifacts = output_dict[output_name]
if not artifacts:
artifacts.append(channel_param.type())
for artifact in artifacts:
artifact.uri = uri
if issubclass(component_instance.executor_spec.executor_class,
base_beam_executor.BaseBeamExecutor):
executor_context = base_beam_executor.BaseBeamExecutor.Context(
beam_pipeline_args=beam_pipeline_args,
tmp_dir=temp_directory_path,
unique_id='',
)
else:
executor_context = base_executor.BaseExecutor.Context(
extra_flags=beam_pipeline_args,
tmp_dir=temp_directory_path,
unique_id='',
)
executor = component_instance.executor_spec.executor_class(executor_context)
executor.Do(
input_dict=input_dict,
output_dict=output_dict,
exec_properties=exec_properties,
)
# Writing out the output artifact properties
for output_name, channel_param in component_class.SPEC_CLASS.OUTPUTS.items():
for property_name in channel_param.type.PROPERTIES or []:
property_path_arg_name = output_name + '_' + property_name + '_path'
property_path = arguments.get(property_path_arg_name)
if property_path:
artifacts = output_dict[output_name]
for artifact in artifacts:
property_value = getattr(artifact, property_name)
os.makedirs(os.path.dirname(property_path), exist_ok=True)
with open(property_path, 'w') as f:
f.write(str(property_value))