tfx/proto/orchestration/pipeline.proto (710 lines of code) (raw):

// Copyright 2020 Google LLC. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. syntax = "proto3"; package tfx.orchestration; import "google/protobuf/any.proto"; import "google/protobuf/descriptor.proto"; import "ml_metadata/proto/metadata_store.proto"; import "tfx/proto/orchestration/garbage_collection_policy.proto"; import "tfx/proto/orchestration/placeholder.proto"; // ResolverConfig is subject to change. We plan to introduce a flexible // config to enable more sophisticated policies in the future. // TODO(b/152230663): Support more flexibility for resolution logic. message ResolverConfig { // Each resolver step takes input map (Dict[Text, List[Artifact]]), process // the map, and optionally emits it (Optional[Dict[Text, List[Artifacts]]]). // If resolver step does not emit any input map, execution will not be // triggered. message ResolverStep { // Class path (<module_name>.<class_name>) of the resolver. For example // "tfx.dsl.resolvers.latest_artifact_strategy.LatestArtifactStrategy". // Resolver class should be the subclass of the // `tfx.dsl.components.common.resolver.ResolverStrategy` class. string class_path = 1; // JSON serialized resolver config which will be used as keyword arguments // on instantiating resolver class. Any tfx.utils.json_utils.Jsonable value // can be used. string config_json = 2; // Optional list of input keys that Resolver instance would use. Other keys // would bypass the resolver instance and passed as is to the next step. If // resolver returns None, bypassed inputs are also ignored and ResolverStep // would return None. // If not specified, all input keys will be used. repeated string input_keys = 3; } // Series of resolver steps that would be applied in order. Inputs and outputs // of resolver are the same (Dict[Text, List[Artifact]]), so the list of // resolvers can be composed to act as a single resolver. If any of such // composed resolvers returns None, the rest of the resolvers would not be // executed and early return None (which means component will not be // triggered). repeated ResolverStep resolver_steps = 1; } // Definition for runtime parameters. message RuntimeParameter { enum Type { TYPE_UNSPECIFIED = 0; INT = 1; DOUBLE = 2; STRING = 3; } // Required field. The name of the runtime parameter. This should be globally // unique within the pipeline scope. string name = 1; // Required field. The type of the runtime parameter. Type type = 2; // Optional field. Default value of the runtime parameter. If not set and the // runtime parameter value is not provided during runtime, an error will be // raised. ml_metadata.Value default_value = 3; } // TODO(b/157270778): Support structural runtime parameter at the SDK level. // Definition for structural runtime parameters. // This can be used to combine several runtime parameters into a single string // with parts of that being pre-set by users. Consider the following example: // `[RuntimeParameter(a), '_some_string_', RuntimeParameter(b)]` // During runtime, the system will resolve the runtime parameters in the list // and concatenate all pieces in the list together into a single string. message StructuralRuntimeParameter { // Definition of each part in the structural runtime parameter. Each part can // be either a string or a runtime parameter. message StringOrRuntimeParameter { oneof value { string constant_value = 1; RuntimeParameter runtime_parameter = 2; } } repeated StringOrRuntimeParameter parts = 1; } // Definition for Value in uDSL IR. A Value instance can be one of: a field // value that is determined during compilation time, a runtime parameter // which will be determined during runtime, or a placeholder which will be // determined during runtime / before / at execution time. message Value { oneof value { ml_metadata.Value field_value = 1; RuntimeParameter runtime_parameter = 2; StructuralRuntimeParameter structural_runtime_parameter = 3; PlaceholderExpression placeholder = 5; } // If non-primitive types get JSON serialized and stored as string value in // ml_metadata.Value, ValueType provides necessary information to deserialize // the string and recover type information. message Schema { message ProtoType { string message_type = 1; google.protobuf.FileDescriptorSet file_descriptors = 2; } message BooleanType {} message ValueType { oneof type { ValueType list_type = 1; ProtoType proto_type = 2; BooleanType boolean_type = 3; ValueType dict_type = 4; } } ValueType value_type = 1; } Schema schema = 4; } // Definition of a predicate on property values. It can be one of the following: // 1. A value comparator that predicates on a certain property and target value. // 2. A unary logical operator that operates on a sub predicate. // 3. A binary logical operator that operates on two sub predicates. message PropertyPredicate { // Property value comparator. message ValueComparator { // Operators for comparison. enum Op { OP_UNSPECIFIED = 0; // The following two ops are available for all types. EQ = 1; LT = 2; } // The name of the property. string property_name = 1; // The target value to compare with. Value target_value = 2; Op op = 3; // Users can choose to set 0, 1 or 2 of the following two fields. // - If none of them is set, the predicate is operated on single Artifact. // - If only 'input_key' is set, the predicate is operated on // Dict[Text, Artifact]. // - If only 'input_index' is set, the predicate is operated on // List[Artifact]. // - If both 'input_key' and 'input_index' are set, the predicate is // operated on Dict[Text, List[Artifact]]. string input_key = 4; int32 input_index = 5; bool is_custom_property = 6; } // Logical operator on one element. message UnaryLogicalOperator { enum LogicalOp { OP_UNSPECIFIED = 0; NOT = 1; } LogicalOp op = 1; // The operand to operate on. PropertyPredicate operand = 2; } // Logical operator on two elements. message BinaryLogicalOperator { enum LogicalOp { OP_UNSPECIFIED = 0; AND = 1; OR = 2; } LogicalOp op = 1; // The left-hand side element to the logical operator. PropertyPredicate lhs = 2; // The right-hand side element to the logical operator. PropertyPredicate rhs = 3; } oneof operator { ValueComparator value_comparator = 1; UnaryLogicalOperator unary_logical_operator = 2; BinaryLogicalOperator binary_logical_operator = 3; } } // InputGraph expresses a declarative input resolution logic with a graph of // ResolverOps and a result node. An entire InputGraph is basically a function // without argument that returns one of DataType value. Its result is referenced // from `InputSpec.InputGraphRef`. message InputGraph { // Data type of the OpNode output. Corresponds to the // tfx.dsl.resolver_op.DataType. enum DataType { DATA_TYPE_UNSPECIFIED = 0; ARTIFACT_LIST = 1; ARTIFACT_MULTIMAP = 2; ARTIFACT_MULTIMAP_LIST = 3; } message Node { DataType output_data_type = 1; oneof kind { OpNode op_node = 2; DictNode dict_node = 3; InputNode input_node = 4; } } // ResolverOp instance as a input graph's node. Corresponds to the // tfx.dsl.resolver_op.OpNode. message OpNode { // A canonical_name of ResolverOp (e.g. "tfx.internal.Unnest"). string op_type = 1; // Generic argument type that can express either another Node's outputs // or a static value. message Arg { oneof kind { // ID of another Node instance, referring to its output. string node_id = 1; // Static value. Value value = 2; } } // Optional positional arguments; its kind must be `node_id`. repeated Arg args = 2; // Optional keyword arguments; its kind must be `value`. map<string, Arg> kwargs = 3; } // Dict of other Nodes where each Node.output_data_type is ARTIFACT_LIST. // Its own output_data_type in the parent Node message is ARTIFACT_MULTIMAP. // Corresponds to the tfx.dsl.resolver_op.DictNode. message DictNode { // Map from dict key to node ID. map<string, string> node_ids = 1; } // InputNode refers a NodeInputs.inputs[input_key]. Its output_data_type is // always ARTIFACT_LIST. // Corresponds to the tfx.dsl.resolver_op.InputNode. message InputNode { string input_key = 1; } // Nodes of this input graph. Key is the node ID. map<string, Node> nodes = 1; // The ID of the output node of the input graph. The output data type of the // input graph is the output data type of the reuslt node. string result_node = 2; } // A proto message wrapping all information needed to query one set of artifacts // from MLMD. message InputSpec { // Channel consists of a multiple MLMD filters whose result is always a // homogeneous list of artifacts. message Channel { // Information to query the producer node of the artifacts. message ProducerNodeQuery { // The unique identifier of the node that produced the artifacts. string id = 1; // Predicate on producer node properties. PropertyPredicate property_predicate = 2; } // Information to query the contexts the desired artifacts are in. message ContextQuery { // The type of the Context. ml_metadata.ContextType type = 1; // The name of the context. Value name = 2; // Predicate on the context properties. PropertyPredicate property_predicate = 3; } // Information to query the desired artifacts. message ArtifactQuery { // The type of the artifact. ml_metadata.ArtifactType type = 1; // Predicate on the artifact properties. PropertyPredicate property_predicate = 2; } ProducerNodeQuery producer_node_query = 1; repeated ContextQuery context_queries = 2; ArtifactQuery artifact_query = 3; // The output key of the channel. Consider a `Trainer` with two output // channels: when downstream nodes consume its outputs, output key(s) need // to be specified: // ``` // evaluator = tfx.Evaluator(model=trainer.outputs['some_output_key']) // ``` // where 'some_output_key' is the output key for the channel that evaluator // uses as one of its input. string output_key = 4; google.protobuf.Any metadata_connection_config = 5; } // Statically decided artifacts. message Static { repeated int64 artifact_ids = 1; } // Reference to the InputGraph result. // When the InputGraph result is a ARTIFACT_MULTIMAP_LIST type, InputGraphRef // is NOT simple referring a list of artifacts; instead we should consider the // entire NodeInputs to know the final resolved inputs. // For example, assume input graph `g1` is evaluated as: // // [{"x": [x1], "y": [y1]}, {"x": [x2], "y": [y2]}] // // (x1, x2, y1, y2 are artifacts), and the NodeInputs is: // // inputs["xx"] { // input_graph_ref { // graph_id: "g1" // key: "x" // } // } // // then the final resolved inputs is `[{"xx": [x1]}, {"xx": [x2]}]`, not // `{"xx": [x1, x2]}`. If there is another NodeInputs "yy": // // inputs["yy"] { // input_graph_ref { // graph_id: "g1" // key: "y" // } // } // // then the result is *zipped* as they're from the same input graph source: // // [{"xx": [x1], "yy": [y1]}, {"xx": [x2], "yy": [y2]}] // // but otherwise the result is a *cartesian product* of them. For example if // another input graph `g2` is evaluated as `[{"z": [z1]}, {"z": [z2]}]` and // there is another NodeInputs "zz" referring to it, then the resolved inputs // is a list of 4 dicts: // // [{"xx": [x1], "zz": [z1]}, {"xx": [x1], "zz": [z2]}, // {"xx": [x2], "zz": [z1]}, {"xx": [x2], "zz": [z2]}] message InputGraphRef { // ID of the InputGraph in NodeInputs.input_graphs. string graph_id = 1; // Optional dict key if the result node type is ARTIFACT_MULTIMAP or // ARTIFACT_MULTIMAP_LIST. string key = 2; } // A mixture of InputSpecs. message Mixed { enum Method { UNION = 0; } Method method = 1; repeated string input_keys = 2; } // Exactly one of `channels`, `input_graph_ref`, `mixed_inputs`, or // `static_inputs` field should be set. We don't mandate oneof constraints as // the legacy `channels` field is a repeated field that cannot be wrapped into // oneof. // Union of Channels. repeated Channel channels = 1; // A reference to the InputGraph. InputGraphRef input_graph_ref = 3; // A mixture of InputSpecs. Mixed mixed_inputs = 4; // Static artifacts. Static static_inputs = 6; // The minimum number of artifacts desired. If minimum requirement is not met, // the execution should not be triggered. If min_count is less than or equal // to 0, it means this input is optional. int32 min_count = 2; // Whether the input should be hidden from the final resolved result. Only // the input keys whose corresponding `InputSpec.hidden = false` will be // included in the input map that is passed to the `Executor.Do`. bool hidden = 5; } // The proto message describes specs of all inputs needed for a component // execution. message NodeInputs { // A map between the input tag and specs for the inputs of that tag. map<string, InputSpec> inputs = 1; // Deprecated. Use input_graphs and inputs.input_graph_ref instead. // If resolver_config is set, then InputSpec.inputs should only contains // `channels` field (not `input_graph_ref`), and both `input_graphs` and // `conditionals` fields are ignored. ResolverConfig resolver_config = 2 [deprecated = true]; // Optional InputGraphs mapping where key is an ID. Graph is referenced from // InputSpec.input_graph_ref. map<string, InputGraph> input_graphs = 3; // Conditional representation. message Conditional { PlaceholderExpression placeholder_expression = 1; } // Optional conditionals to filter the valid inputs that satisfy the // predicate. Conditional evaluation and filtering would happen after // resolving all inputs from NodeInputs.inputs and NodeInputs.input_graphs. map<string, Conditional> conditionals = 4; } // A proto message wrapping all information needed to query one set of artifacts // from MLMD. message OutputSpec { // Information of the desired artifacts. message ArtifactSpec { // The name of the artifact type. ml_metadata.ArtifactType type = 1; // Additional properties to set when outputting artifacts. map<string, Value> additional_properties = 2; // Additional custom properties to set when outputting artifacts. map<string, Value> additional_custom_properties = 3; // Predefined URI(s) that will explicitly control Artifact.uri for the ㅤ // external artifacts produced by this output channel. ㅤ repeated string external_artifact_uris = 4; } ArtifactSpec artifact_spec = 1; // Garbage collection policy of the component output channel. GarbageCollectionPolicy garbage_collection_policy = 2; } // TODO(b/163596295): Remove this along with other usages. // Deprecated. Executor specification will be set in pipeline.deployment_config. message ExecutorSpec { // Executor specification for Python-class based executors. message PythonClassExecutorSpec { // The full class path of the executor. string class_path = 1; } oneof spec { PythonClassExecutorSpec python_class_executor_spec = 1; } } // Spec of a context. message ContextSpec { // The type of the context. ml_metadata.ContextType type = 1; // The name of the context. Value name = 2; // Properties of the context. map<string, Value> properties = 3; } // Basic info of a pipeline node, including the type and id of the node. // The information in `NodeInfo` should stay stable across time. Asynchronous // data fetching behavior might change if this changes. message NodeInfo { // The MLMD type of the node. For example, is it an `ExampleGen` or `Trainer`. ml_metadata.ExecutionType type = 1; // The unique identifier of the node within the pipeline definition. This id // will be used in upstream and downstream nodes to indicate node // dependencies. This is generated by the system. string id = 2; } // Specifications of contexts that this node belongs to. All input artifacts, // output artifacts and execution of the node will be linked to the (MLMD) // contexts generated from these specifications. message NodeContexts { repeated ContextSpec contexts = 1; } // Specifications for node outputs. message NodeOutputs { map<string, OutputSpec> outputs = 1; } // Specifications for node parameters. message NodeParameters { map<string, Value> parameters = 1; } // Options for executing the node. message NodeExecutionOptions { message CachingOptions { // Whether or not to enable cache for this node. bool enable_cache = 1; } message Run { // If perform_snapshot is true, this node will perform the snapshot step. bool perform_snapshot = 1; // If depends_on_snapshot is true, the snapshot step must be complete before // this node's executor can run. // Note that it is possible for the node that performs the snapshot to // also have an executor that depends on the snapshot step. bool depends_on_snapshot = 2; } message Skip { // Deprecated. Please use reuse_artifacts_mode field instead. // If reuse_artifacts is true, the snapshot operation will make sure that // output artifacts produced by this node in a previous pipeline run will // be made available in this partial run. bool reuse_artifacts = 1 [deprecated = true]; enum ReuseArtifactsMode { UNSPECIFIED = 0; // The snapshot operation will not reuse any output artifacts for this // node. NEVER = 1; // The snapshot operation will make sure that output artifacts produced by // this node in a previous pipeline run will be made available in this // partial run. REQUIRED = 2; // The snapshot operation will attempt to reuse output artifacts at // best effort basis. OPTIONAL = 3; } ReuseArtifactsMode reuse_artifacts_mode = 2; } CachingOptions caching_options = 1; // Attached by platform-level tooling. oneof partial_run_option { // If set, this node will be run as part of the partial run. Run run = 2; // If set, this node will be skipped in the partial run. Skip skip = 3; } // LINT.IfChange // An enum defines the trigger strategy of when the node will be ready to be // triggered. Only supported in the experimental orchestrator under SYNC mode // and ignored when configured for other platforms. // Note that for all trigger strategies outlined below, all // upstream nodes with either task or data dependency needs to be in a final // state. enum TriggerStrategy { // Unspecified. Behave the same as ALL_UPSTREAM_NODES_SUCCEEDED. TRIGGER_STRATEGY_UNSPECIFIED = 0; // Specifies that all upstream nodes are in succeeded state. ALL_UPSTREAM_NODES_SUCCEEDED = 1; // Specifies that all upstream nodes are in any final state. ALL_UPSTREAM_NODES_COMPLETED = 2; } // The trigger strategy of this node. // Unset or set to default value of TRIGGER_STATEGY_UNDEFINED behaves the // same as ALL_UPSTREAM_NODES_SUCCEEDED. TriggerStrategy strategy = 4; // If set, the node's success is optional for orchestration. In other words, // the node will always be treated as a succeeded node when it completes, both // when triggering downstream nodes with either task or data dependency and // when determining final pipeline run outcome. Only supported in the // experimental orchestrator under SYNC mode and ignored when configured for // other platforms. bool node_success_optional = 5; // LINT.ThenChange(tfx/orchestration/experimental/core/sync_pipeline_task_gen.py) // Maximum number of times to retry an execution if it failed. Only supported // in the internal orchestrator. uint32 max_execution_retries = 6; // Maximum time in seconds before failing an execution. Only supported // in the internal orchestrator. Default to 0 which means never timeout. uint32 execution_timeout_sec = 7; // Fine-grained trigger configuration for the ASYNC execution mode. message AsyncTrigger { message InputTrigger { // Trigger by new property values seen. message TriggerByProperty { repeated string property_keys = 1; } oneof type { // If `True`, this input is not used for triggering. bool no_trigger = 1; // Trigger if new artifact property values are seen. For example if you // want the component to be triggered if the new `span` arrives but skip // when the new `version` of the previous span arrives, set this to // `{ property_keys: ["span"] }`. // Note that `properties` and `custom_properties` are not distinguished // (but `properties` is higher precedence). If the property key is not // found in both property dicts, it will always trigger the new // execution (default behavior). TriggerByProperty trigger_by_property = 2; // Future candidate input triggers: Throttling(timedelta) } } // Map from `input_key` to `InputTrigger`. // By default (i.e. `input_key` not presents in the map), any new input // artifacts (distinguished by `id`) will trigger new execution. map<string, InputTrigger> input_triggers = 1; } // Async mode trigger configuration for this node. AsyncTrigger async_trigger = 8; } // Pipeline node definition. message PipelineNode { // Basic info of a pipeline node. NodeInfo node_info = 1; // Specification for contexts that this node belongs to. NodeContexts contexts = 2; // Specification for node inputs. NodeInputs inputs = 3; // Specification for node outputs. NodeOutputs outputs = 4; // Specification for node parameters. NodeParameters parameters = 5; // Specification for the executor of the node. ExecutorSpec executor = 6 [deprecated = true]; // Ids of the upstream nodes of the current node. repeated string upstream_nodes = 7; // Ids of the downstream nodes of the current node. repeated string downstream_nodes = 8; // Options for executing the node. NodeExecutionOptions execution_options = 9; } // Settings used for snapshot during partial run. // One of the nodes will call `partial_run_utils.snapshot(...)`, allowing this // partial run to reuse artifacts from a previous pipeline run. message SnapshotSettings { message LatestPipelineRunStrategy {} message BasePipelineRunStrategy { string base_run_id = 1; } oneof artifact_reuse_strategy { LatestPipelineRunStrategy latest_pipeline_run_strategy = 1; BasePipelineRunStrategy base_pipeline_run_strategy = 2; } } // Message struct that contains pipeline runtime specifications. message PipelineRuntimeSpec { // Required field. Base directory of the pipeline. If not specified in DSL, // sub-pipelines will be compiled to use the same pipeline root as the parent // pipeline. Value pipeline_root = 1; // A unique id to identify a pipeline run. This will not be set during // compilation time but is required for synchronous pipeline execution. Value pipeline_run_id = 2; // Used for partial runs. SnapshotSettings snapshot_settings = 8; } // Basic info of a pipeline. // The information in `PipelineInfo` should stay stable across time. // Asynchronous data fetching behavior might change if this changes. message PipelineInfo { // Required field. A pipeline must have an id. string id = 1; } // Definition for a uDSL pipeline. This is also the definition of a // sub-pipeline. message Pipeline { enum ExecutionMode { EXECUTION_MODE_UNSPECIFIED = 0; SYNC = 1; ASYNC = 2; } // A node inside a pipeline can be either a `PipelineNode` or a `Pipeline` as // a sub-pipeline. message PipelineOrNode { oneof node { // A normal pipeline node. This is the unsplittable execution unit. PipelineNode pipeline_node = 1; // Sub-pipelines should only have execution mode `SYNC`. Pipeline sub_pipeline = 2; } } PipelineInfo pipeline_info = 1; repeated PipelineOrNode nodes = 2; PipelineRuntimeSpec runtime_spec = 3; // Execution mode of the pipeline. Only the outermost pipeline can be `ASYNC`. ExecutionMode execution_mode = 4; // Deprecated. Please use 'deployment_config' instead. // Configs for different platforms, keyed by tags for different platforms that // users provide. map<string, google.protobuf.Any> platform_configs = 5 [deprecated = true]; // Deployment config for the pipeline. This usually includes the following: // - A map from `node_id` to executor specification. This should be set for // all nodes that have business logic. // - A map from `node_id` to custom driver specification. This should be set // only when custom driver is involved which is a rare advanced use case. // - A map from label to platform specific configs. // - ML-metadata connection config. // - Other configs. google.protobuf.Any deployment_config = 7; // TFX DSL SDK version for this pipeline. string sdk_version = 6; } // Definition of the intermediate 'deployment_config' generated by the general // TFX DSL compiler. The result will be reinterpreted by different runners for // different platforms. message IntermediateDeploymentConfig { // A key from `node_id` to executor specs. Note that this will cover all nodes // that has business logic to process. map<string, google.protobuf.Any> executor_specs = 1; // A key from `node_id` to custom driver specs. Note that this map usually has // less entries than the `executor_specs` as we only expect advanced users to // set custom driver logic. map<string, google.protobuf.Any> custom_driver_specs = 2; // TODO(b/164108495): Figures out the DSL channel to set node level platform // configs. // A key from `node_id` to platform specs. This is placeholders for extra // platform related specifications which are set by users or platform runners // at node level. map<string, google.protobuf.Any> node_level_platform_configs = 3; // Pipeline level platform specific configs. google.protobuf.Any pipeline_level_platform_config = 4; // Connection config to ML-metadata. google.protobuf.Any metadata_connection_config = 5; } // Pipeline-level specifications for partial run that are exposed to users. message PartialRun { // Source node ids. // Only run nodes that are reachable downstream from from_nodes (inclusive). repeated string from_nodes = 1; // Sink node ids. // Only run nodes that are reachable upstream from to_nodes (inclusive). repeated string to_nodes = 2; // Skip node ids. // Nodes between from and to nodes that can be skipped for pipeline run. Note // that if a node depends on nodes that can not be skipped, it will run for // pipeline result correctness. // For example, in the following case // A --> B (from_nodes)--> C --> D (to_nodes) // 1) if skip_nodes = [B], then nodes C, D get run; // 2) if skip_nodes = [C], then nodes B, C, D will run. repeated string skip_nodes = 4; // Settings used for snapshot during partial run. SnapshotSettings snapshot_settings = 3; } // RunOptions is passed to the DAG runner when the pipeline is run. // It is not part of the Pipeline IR. message RunOptions { // Pipeline-level specifications for partial run. PartialRun partial_run = 1; } // UpdateOptions is passed to the pipeline runner when the pipeline is run. // It is not part of the Pipeline IR. message UpdateOptions { enum ReloadPolicy { // Reload all active nodes upon pipeline update. ALL = 0; // Reload only the nodes selected in `reload_nodes`. PARTIAL = 1; } ReloadPolicy reload_policy = 1; // The ids of nodes to reload, which can be empty if no node is to be // reloaded. Only applicable when ReloadPolicy is PARTIAL. repeated string reload_nodes = 2; }