tfx/orchestration/portable/outputs_utils.py (241 lines of code) (raw):

# Copyright 2020 Google LLC. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Portable library for output artifacts resolution including caching decision. """ import collections import copy import datetime import os from typing import Any, Dict, List, Mapping, Optional, Sequence, Union from absl import logging from tfx import types from tfx import version from tfx.dsl.io import fileio from tfx.orchestration import data_types_utils from tfx.orchestration import node_proto_view from tfx.proto.orchestration import execution_result_pb2 from tfx.proto.orchestration import pipeline_pb2 from tfx.types import artifact_utils from tfx.types.value_artifact import ValueArtifact from tfx.utils import proto_utils from ml_metadata.proto import metadata_store_pb2 _SYSTEM = '.system' _EXECUTOR_EXECUTION = 'executor_execution' _DRIVER_EXECUTION = 'driver_execution' _STATEFUL_WORKING_DIR = 'stateful_working_dir' _DRIVER_OUTPUT_FILE = 'driver_output.pb' _EXECUTOR_OUTPUT_FILE = 'executor_output.pb' _VALUE_ARTIFACT_FILE_NAME = 'value' # The fixed special value to indicate that the binary will set the output URI # value during its execution. # LINT.IfChange RESOLVED_AT_RUNTIME = '{resolved_at_runtime}' # LINT.ThenChange(<Internal source code>) def make_output_dirs( output_dict: Mapping[str, Sequence[types.Artifact]]) -> None: """Make dirs for output artifacts' URI.""" for _, artifact_list in output_dict.items(): for artifact in artifact_list: # Omit lifecycle management for external artifacts. if artifact.is_external: continue if isinstance(artifact, ValueArtifact): # If this is a ValueArtifact, create the file if it does not exist. if not fileio.exists(artifact.uri): artifact_dir = os.path.dirname(artifact.uri) fileio.makedirs(artifact_dir) with fileio.open(artifact.uri, 'w') as f: # Because fileio.open won't create an empty file, we write an # empty string to it to force the creation. f.write('') else: # Otherwise create a dir. fileio.makedirs(artifact.uri) def remove_output_dirs( output_dict: Mapping[str, Sequence[types.Artifact]]) -> None: """Remove dirs of output artifacts' URI.""" for _, artifact_list in output_dict.items(): for artifact in artifact_list: # Omit lifecycle management for external artifacts. if artifact.is_external: continue if fileio.isdir(artifact.uri): fileio.rmtree(artifact.uri) else: fileio.remove(artifact.uri) def clear_output_dirs( output_dict: Mapping[str, Sequence[types.Artifact]]) -> None: """Clear dirs of output artifacts' URI.""" for _, artifact_list in output_dict.items(): for artifact in artifact_list: # Omit lifecycle management for external artifacts. if artifact.is_external: continue # Clear out the contents of the output directory while preserving the # output directory itself. Needed to preserve any storage attributes of # the output directory. if not fileio.isdir(artifact.uri): continue child_paths = [ os.path.join(artifact.uri, filename) for filename in fileio.listdir(artifact.uri) ] for path in child_paths: if fileio.isdir(path): fileio.rmtree(path) else: fileio.remove(path) def remove_stateful_working_dir(stateful_working_dir: str) -> None: """Remove stateful_working_dir.""" # Clean up stateful working dir # Note that: # stateful_working_dir = os.path.join( # self._node_dir, # _SYSTEM, # _STATEFUL_WORKING_DIR, <-- we want to clean from this level down. # dir_suffix) stateful_working_dir = os.path.abspath( os.path.join(stateful_working_dir, os.pardir)) try: fileio.rmtree(stateful_working_dir) except fileio.NotFoundError: logging.warning( 'stateful_working_dir %s is not found, not going to delete it.', stateful_working_dir) def _attach_artifact_properties(spec: pipeline_pb2.OutputSpec.ArtifactSpec, artifact: types.Artifact): """Attaches properties of an artifact using ArtifactSpec.""" for key, value in spec.additional_properties.items(): if not value.HasField('field_value'): raise RuntimeError('Property value is not a field_value for %s' % key) if value.field_value.HasField('proto_value'): # Proto properties need to be unpacked from the google.protobuf.Any # message to its concrete message before setting the artifact property property_value = proto_utils.unpack_proto_any( value.field_value.proto_value) else: property_value = data_types_utils.get_metadata_value(value.field_value) setattr(artifact, key, property_value) for key, value in spec.additional_custom_properties.items(): if not value.HasField('field_value'): raise RuntimeError('Property value is not a field_value for %s' % key) value_type = value.field_value.WhichOneof('value') if value_type == 'int_value': artifact.set_int_custom_property(key, value.field_value.int_value) elif value_type == 'string_value': artifact.set_string_custom_property(key, value.field_value.string_value) elif value_type == 'double_value': artifact.set_float_custom_property(key, value.field_value.double_value) elif value_type == 'proto_value': proto_value = proto_utils.unpack_proto_any(value.field_value.proto_value) artifact.set_proto_custom_property(key, proto_value) else: raise RuntimeError(f'Unexpected value_type: {value_type}') class OutputsResolver: """This class has methods to handle launcher output related logic.""" def __init__(self, pipeline_node: Union[pipeline_pb2.PipelineNode, node_proto_view.NodeProtoView], pipeline_info: pipeline_pb2.PipelineInfo, pipeline_runtime_spec: pipeline_pb2.PipelineRuntimeSpec, execution_mode: 'pipeline_pb2.Pipeline.ExecutionMode' = ( pipeline_pb2.Pipeline.SYNC)): self._pipeline_node = pipeline_node self._pipeline_info = pipeline_info self._pipeline_root = ( pipeline_runtime_spec.pipeline_root.field_value.string_value) self._pipeline_run_id = ( pipeline_runtime_spec.pipeline_run_id.field_value.string_value) self._execution_mode = execution_mode self._node_dir = os.path.join(self._pipeline_root, pipeline_node.node_info.id) def generate_output_artifacts( self, execution_id: int) -> Dict[str, List[types.Artifact]]: """Generates output artifacts given execution_id.""" return generate_output_artifacts( execution_id=execution_id, outputs=self._pipeline_node.outputs.outputs, node_dir=self._node_dir, pipeline_root=self._pipeline_root) def get_executor_output_uri(self, execution_id: int) -> str: """Generates executor output uri given execution_id.""" execution_dir = os.path.join(self._node_dir, _SYSTEM, _EXECUTOR_EXECUTION, str(execution_id)) fileio.makedirs(execution_dir) return os.path.join(execution_dir, _EXECUTOR_OUTPUT_FILE) def get_driver_output_uri(self) -> str: driver_output_dir = os.path.join( self._node_dir, _SYSTEM, _DRIVER_EXECUTION, str(int(datetime.datetime.now().timestamp() * 1000000))) fileio.makedirs(driver_output_dir) return os.path.join(driver_output_dir, _DRIVER_OUTPUT_FILE) def get_stateful_working_directory(self, execution_id: Optional[int] = None) -> str: """Generates stateful working directory given (optional) execution id. Args: execution_id: An optional execution id which will be used as part of the stateful working dir path if provided. The stateful working dir path will be <node_dir>/.system/stateful_working_dir/<execution_id>. If execution_id is not provided, for backward compatibility purposes, <pipeline_run_id> is used instead of <execution_id> but an error is raised if the execution_mode is not SYNC (since ASYNC pipelines have no pipeline_run_id). Returns: Path to stateful working directory. Raises: ValueError: If execution_id is not provided and execution_mode of the pipeline is not SYNC. """ return get_stateful_working_directory(self._node_dir, self._execution_mode, self._pipeline_run_id, execution_id) def make_tmp_dir(self, execution_id: int) -> str: """Generates a temporary directory.""" return make_tmp_dir(self._node_dir, execution_id) def _generate_output_artifact( output_spec: pipeline_pb2.OutputSpec) -> types.Artifact: """Generates each output artifact given output_spec.""" artifact = artifact_utils.deserialize_artifact(output_spec.artifact_spec.type) _attach_artifact_properties(output_spec.artifact_spec, artifact) return artifact def _validate_external_uri(external_uri: str, pipeline_root: Optional[str]) -> None: """Validates a user-defined external artifact URI.""" if external_uri == RESOLVED_AT_RUNTIME: return if pipeline_root and pipeline_root in external_uri: raise ValueError('External artifact URI %s is not allowed within the ' 'pipeline base directory.' % external_uri) def generate_output_artifacts( execution_id: int, outputs: Mapping[str, pipeline_pb2.OutputSpec], node_dir: str, pipeline_root: Optional[str] = None) -> Dict[str, List[types.Artifact]]: """Generates output artifacts. Args: execution_id: The id of the execution. outputs: Mapping from artifact key to its OutputSpec value in pipeline IR. node_dir: The root directory of the node. pipeline_root: Path to root directory of the pipeline. Returns: Mapping from artifact key to the list of TFX artifacts. Raises: ValueError: If any external artifact uri is inside the pipeline_root. """ output_artifacts = collections.defaultdict(list) for key, output_spec in outputs.items(): artifact = _generate_output_artifact(output_spec) if output_spec.artifact_spec.external_artifact_uris: for external_uri in output_spec.artifact_spec.external_artifact_uris: _validate_external_uri(external_uri, pipeline_root) external_artifact = copy.deepcopy(artifact) external_artifact.uri = external_uri external_artifact.is_external = True logging.debug('Creating external output artifact uri %s', external_artifact.uri) output_artifacts[key].append(external_artifact) else: artifact.uri = os.path.join(node_dir, key, str(execution_id)) if isinstance(artifact, ValueArtifact): artifact.uri = os.path.join(artifact.uri, _VALUE_ARTIFACT_FILE_NAME) logging.debug('Creating output artifact uri %s', artifact.uri) output_artifacts[key].append(artifact) return output_artifacts def get_stateful_working_directory(node_dir: str, execution_mode: pipeline_pb2.Pipeline .ExecutionMode = pipeline_pb2.Pipeline.SYNC, pipeline_run_id: str = '', execution_id: Optional[int] = None) -> str: """Generates stateful working directory. Args: node_dir: The root directory of the node. execution_mode: Execution mode of the pipeline. pipeline_run_id: Optional pipeline_run_id, only available if execution mode is SYNC. execution_id: An optional execution id which will be used as part of the stateful working dir path if provided. The stateful working dir path will be <node_dir>/.system/stateful_working_dir/<execution_id>. If execution_id is not provided, for backward compatibility purposes, <pipeline_run_id> is used instead of <execution_id> but an error is raised if the execution_mode is not SYNC (since ASYNC pipelines have no pipeline_run_id). Returns: Path to stateful working directory. Raises: ValueError: If execution_id is not provided and execution_mode of the pipeline is not SYNC. """ if (execution_id is None and execution_mode != pipeline_pb2.Pipeline.SYNC): raise ValueError( 'Cannot create stateful working dir if execution id is `None` and ' 'the execution mode of the pipeline is not `SYNC`.') if execution_id is None: dir_suffix = pipeline_run_id else: dir_suffix = str(execution_id) # TODO(b/150979622): We should introduce an id that is not changed across # retries of the same component run to provide better isolation between # "retry" and "new execution". When it is available, introduce it into # stateful working directory. # NOTE: If this directory structure is changed, please update # the remove_stateful_working_dir function in this file accordingly. stateful_working_dir = os.path.join(node_dir, _SYSTEM, _STATEFUL_WORKING_DIR, dir_suffix) try: fileio.makedirs(stateful_working_dir) except Exception: # pylint: disable=broad-except logging.exception('Failed to make stateful working dir: %s', stateful_working_dir) raise return stateful_working_dir def make_tmp_dir(node_dir: str, execution_id: int) -> str: """Generates a temporary directory.""" result = os.path.join(node_dir, _SYSTEM, _EXECUTOR_EXECUTION, str(execution_id), '.temp', '') fileio.makedirs(result) return result def tag_output_artifacts_with_version( output_artifacts: Optional[Mapping[str, Sequence[types.Artifact]]] = None): """Tag output artifacts with the current TFX version.""" if not output_artifacts: return for unused_key, artifact_list in output_artifacts.items(): for artifact in artifact_list: if not artifact.has_custom_property( artifact_utils.ARTIFACT_TFX_VERSION_CUSTOM_PROPERTY_KEY): artifact.set_string_custom_property( artifact_utils.ARTIFACT_TFX_VERSION_CUSTOM_PROPERTY_KEY, version.__version__) def populate_output_artifact( executor_output: execution_result_pb2.ExecutorOutput, output_dict: Mapping[str, Sequence[types.Artifact]]): """Populate output_dict to executor_output.""" for key, artifact_list in output_dict.items(): artifacts = execution_result_pb2.ExecutorOutput.ArtifactList() for artifact in artifact_list: artifacts.artifacts.append(artifact.mlmd_artifact) executor_output.output_artifacts[key].CopyFrom(artifacts) def populate_exec_properties( executor_output: execution_result_pb2.ExecutorOutput, exec_properties: Mapping[str, Any]): """Populate exec_properties to executor_output.""" for key, value in exec_properties.items(): v = metadata_store_pb2.Value() if isinstance(value, str): v.string_value = value elif isinstance(value, int): v.int_value = value elif isinstance(value, float): v.double_value = value else: logging.info( 'Value type %s of key %s in exec_properties is not ' 'supported, going to drop it', type(value), key) continue executor_output.execution_properties[key].CopyFrom(v)