in tfx/orchestration/portable/launcher.py [0:0]
def _prepare_execution(self) -> _ExecutionPreparationResult:
"""Prepares inputs, outputs and execution properties for actual execution."""
with self._mlmd_connection as m:
# 1.Prepares all contexts.
contexts = context_lib.prepare_contexts(
metadata_handler=m, node_contexts=self._pipeline_node.contexts)
# 2. Resolves inputs and execution properties.
exec_properties = data_types_utils.build_parsed_value_dict(
inputs_utils.resolve_parameters_with_schema(
node_parameters=self._pipeline_node.parameters))
try:
resolved_inputs = inputs_utils.resolve_input_artifacts(
pipeline_node=self._pipeline_node,
metadata_handler=m)
logging.info('[%s] Resolved inputs: %s',
self._pipeline_node.node_info.id, resolved_inputs)
except exceptions.InputResolutionError as e:
logging.exception('[%s] Input resolution error: %s',
self._pipeline_node.node_info.id, e)
execution = self._register_or_reuse_execution(
metadata_handler=m,
contexts=contexts,
exec_properties=exec_properties)
if not execution_lib.is_execution_successful(execution):
self._publish_failed_execution(
execution_id=execution.id,
contexts=contexts,
executor_output=self._build_error_output(code=e.grpc_code_value))
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(
execution_id=execution.id),
contexts=contexts,
is_execution_needed=False)
# 3. If not all required inputs are met. Return ExecutionInfo with
# is_execution_needed being false. No publish will happen so down stream
# nodes won't be triggered.
# TODO(b/197907821): Publish special execution for Skip?
if isinstance(resolved_inputs, inputs_utils.Skip):
logging.info('Skipping execution for %s',
self._pipeline_node.node_info.id)
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(),
contexts=contexts,
is_execution_needed=False)
# TODO(b/197741942): Support len > 1.
if len(resolved_inputs) > 1:
executor_output = self._build_error_output(
_ERROR_CODE_UNIMPLEMENTED,
'Handling more than one input dicts not implemented yet.')
execution = self._register_or_reuse_execution(
metadata_handler=m,
contexts=contexts,
exec_properties=exec_properties)
if not execution_lib.is_execution_successful(execution):
self._publish_failed_execution(
execution_id=execution.id,
contexts=contexts,
executor_output=executor_output)
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(
execution_id=execution.id),
contexts=contexts,
is_execution_needed=False)
input_artifacts = resolved_inputs[0]
# 4. Resolve the dynamic exec properties from implicit input channels.
try:
dynamic_exec_properties = inputs_utils.resolve_dynamic_parameters(
node_parameters=self._pipeline_node.parameters,
input_artifacts=input_artifacts)
exec_properties.update(dynamic_exec_properties)
except exceptions.InputResolutionError as e:
execution = self._register_or_reuse_execution(
metadata_handler=m,
contexts=contexts,
exec_properties=exec_properties)
if not execution_lib.is_execution_successful(execution):
self._publish_failed_execution(
execution_id=execution.id,
contexts=contexts,
executor_output=self._build_error_output(code=e.grpc_code_value))
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(
execution_id=execution.id),
contexts=contexts,
is_execution_needed=False)
# 5. Registers execution in metadata.
execution = self._register_or_reuse_execution(
metadata_handler=m,
contexts=contexts,
input_artifacts=input_artifacts,
exec_properties=exec_properties)
if execution_lib.is_execution_successful(execution):
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(
execution_id=execution.id),
contexts=contexts,
is_execution_needed=False)
# 6. Resolve output
output_artifacts = self._output_resolver.generate_output_artifacts(
execution.id)
# If there is a custom driver, runs it.
if self._driver_operator:
driver_output = self._driver_operator.run_driver(
self._build_execution_info(
input_dict=input_artifacts,
output_dict=output_artifacts,
exec_properties=exec_properties,
execution_output_uri=(
self._output_resolver.get_driver_output_uri())))
self._update_with_driver_output(driver_output, exec_properties,
output_artifacts)
# We reconnect to MLMD here because the custom driver closes MLMD connection
# on returning.
with self._mlmd_connection as m:
# 7. Check cached result
cache_context = cache_utils.get_cache_context(
metadata_handler=m,
pipeline_node=self._pipeline_node,
pipeline_info=self._pipeline_info,
executor_spec=self._executor_spec,
input_artifacts=input_artifacts,
output_artifacts=output_artifacts,
parameters=exec_properties)
contexts.append(cache_context)
# 8. Should cache be used?
if self._pipeline_node.execution_options.caching_options.enable_cache:
cached_outputs = cache_utils.get_cached_outputs(
metadata_handler=m, cache_context=cache_context)
if cached_outputs is not None:
# Publishes cache result
execution_publish_utils.publish_cached_execution(
metadata_handler=m,
contexts=contexts,
execution_id=execution.id,
output_artifacts=cached_outputs)
logging.info('A cached execution %d is used.', execution.id)
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(
execution_id=execution.id,
input_dict=input_artifacts,
output_dict=output_artifacts,
exec_properties=exec_properties),
execution_metadata=execution,
contexts=contexts,
is_execution_needed=False)
# 9. Going to trigger executor.
logging.info('Going to run a new execution %d', execution.id)
return _ExecutionPreparationResult(
execution_info=self._build_execution_info(
execution_id=execution.id,
input_dict=input_artifacts,
output_dict=output_artifacts,
exec_properties=exec_properties,
execution_output_uri=(
self._output_resolver.get_executor_output_uri(execution.id)),
stateful_working_dir=(
self._output_resolver.get_stateful_working_directory()),
tmp_dir=self._output_resolver.make_tmp_dir(execution.id)),
execution_metadata=execution,
contexts=contexts,
is_execution_needed=True)