tfx/proto/orchestration/pipeline.proto (710 lines of code) (raw):
// Copyright 2020 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package tfx.orchestration;
import "google/protobuf/any.proto";
import "google/protobuf/descriptor.proto";
import "ml_metadata/proto/metadata_store.proto";
import "tfx/proto/orchestration/garbage_collection_policy.proto";
import "tfx/proto/orchestration/placeholder.proto";
// ResolverConfig is subject to change. We plan to introduce a flexible
// config to enable more sophisticated policies in the future.
// TODO(b/152230663): Support more flexibility for resolution logic.
message ResolverConfig {
// Each resolver step takes input map (Dict[Text, List[Artifact]]), process
// the map, and optionally emits it (Optional[Dict[Text, List[Artifacts]]]).
// If resolver step does not emit any input map, execution will not be
// triggered.
message ResolverStep {
// Class path (<module_name>.<class_name>) of the resolver. For example
// "tfx.dsl.resolvers.latest_artifact_strategy.LatestArtifactStrategy".
// Resolver class should be the subclass of the
// `tfx.dsl.components.common.resolver.ResolverStrategy` class.
string class_path = 1;
// JSON serialized resolver config which will be used as keyword arguments
// on instantiating resolver class. Any tfx.utils.json_utils.Jsonable value
// can be used.
string config_json = 2;
// Optional list of input keys that Resolver instance would use. Other keys
// would bypass the resolver instance and passed as is to the next step. If
// resolver returns None, bypassed inputs are also ignored and ResolverStep
// would return None.
// If not specified, all input keys will be used.
repeated string input_keys = 3;
}
// Series of resolver steps that would be applied in order. Inputs and outputs
// of resolver are the same (Dict[Text, List[Artifact]]), so the list of
// resolvers can be composed to act as a single resolver. If any of such
// composed resolvers returns None, the rest of the resolvers would not be
// executed and early return None (which means component will not be
// triggered).
repeated ResolverStep resolver_steps = 1;
}
// Definition for runtime parameters.
message RuntimeParameter {
enum Type {
TYPE_UNSPECIFIED = 0;
INT = 1;
DOUBLE = 2;
STRING = 3;
}
// Required field. The name of the runtime parameter. This should be globally
// unique within the pipeline scope.
string name = 1;
// Required field. The type of the runtime parameter.
Type type = 2;
// Optional field. Default value of the runtime parameter. If not set and the
// runtime parameter value is not provided during runtime, an error will be
// raised.
ml_metadata.Value default_value = 3;
}
// TODO(b/157270778): Support structural runtime parameter at the SDK level.
// Definition for structural runtime parameters.
// This can be used to combine several runtime parameters into a single string
// with parts of that being pre-set by users. Consider the following example:
// `[RuntimeParameter(a), '_some_string_', RuntimeParameter(b)]`
// During runtime, the system will resolve the runtime parameters in the list
// and concatenate all pieces in the list together into a single string.
message StructuralRuntimeParameter {
// Definition of each part in the structural runtime parameter. Each part can
// be either a string or a runtime parameter.
message StringOrRuntimeParameter {
oneof value {
string constant_value = 1;
RuntimeParameter runtime_parameter = 2;
}
}
repeated StringOrRuntimeParameter parts = 1;
}
// Definition for Value in uDSL IR. A Value instance can be one of: a field
// value that is determined during compilation time, a runtime parameter
// which will be determined during runtime, or a placeholder which will be
// determined during runtime / before / at execution time.
message Value {
oneof value {
ml_metadata.Value field_value = 1;
RuntimeParameter runtime_parameter = 2;
StructuralRuntimeParameter structural_runtime_parameter = 3;
PlaceholderExpression placeholder = 5;
}
// If non-primitive types get JSON serialized and stored as string value in
// ml_metadata.Value, ValueType provides necessary information to deserialize
// the string and recover type information.
message Schema {
message ProtoType {
string message_type = 1;
google.protobuf.FileDescriptorSet file_descriptors = 2;
}
message BooleanType {}
message ValueType {
oneof type {
ValueType list_type = 1;
ProtoType proto_type = 2;
BooleanType boolean_type = 3;
ValueType dict_type = 4;
}
}
ValueType value_type = 1;
}
Schema schema = 4;
}
// Definition of a predicate on property values. It can be one of the following:
// 1. A value comparator that predicates on a certain property and target value.
// 2. A unary logical operator that operates on a sub predicate.
// 3. A binary logical operator that operates on two sub predicates.
message PropertyPredicate {
// Property value comparator.
message ValueComparator {
// Operators for comparison.
enum Op {
OP_UNSPECIFIED = 0;
// The following two ops are available for all types.
EQ = 1;
LT = 2;
}
// The name of the property.
string property_name = 1;
// The target value to compare with.
Value target_value = 2;
Op op = 3;
// Users can choose to set 0, 1 or 2 of the following two fields.
// - If none of them is set, the predicate is operated on single Artifact.
// - If only 'input_key' is set, the predicate is operated on
// Dict[Text, Artifact].
// - If only 'input_index' is set, the predicate is operated on
// List[Artifact].
// - If both 'input_key' and 'input_index' are set, the predicate is
// operated on Dict[Text, List[Artifact]].
string input_key = 4;
int32 input_index = 5;
bool is_custom_property = 6;
}
// Logical operator on one element.
message UnaryLogicalOperator {
enum LogicalOp {
OP_UNSPECIFIED = 0;
NOT = 1;
}
LogicalOp op = 1;
// The operand to operate on.
PropertyPredicate operand = 2;
}
// Logical operator on two elements.
message BinaryLogicalOperator {
enum LogicalOp {
OP_UNSPECIFIED = 0;
AND = 1;
OR = 2;
}
LogicalOp op = 1;
// The left-hand side element to the logical operator.
PropertyPredicate lhs = 2;
// The right-hand side element to the logical operator.
PropertyPredicate rhs = 3;
}
oneof operator {
ValueComparator value_comparator = 1;
UnaryLogicalOperator unary_logical_operator = 2;
BinaryLogicalOperator binary_logical_operator = 3;
}
}
// InputGraph expresses a declarative input resolution logic with a graph of
// ResolverOps and a result node. An entire InputGraph is basically a function
// without argument that returns one of DataType value. Its result is referenced
// from `InputSpec.InputGraphRef`.
message InputGraph {
// Data type of the OpNode output. Corresponds to the
// tfx.dsl.resolver_op.DataType.
enum DataType {
DATA_TYPE_UNSPECIFIED = 0;
ARTIFACT_LIST = 1;
ARTIFACT_MULTIMAP = 2;
ARTIFACT_MULTIMAP_LIST = 3;
}
message Node {
DataType output_data_type = 1;
oneof kind {
OpNode op_node = 2;
DictNode dict_node = 3;
InputNode input_node = 4;
}
}
// ResolverOp instance as a input graph's node. Corresponds to the
// tfx.dsl.resolver_op.OpNode.
message OpNode {
// A canonical_name of ResolverOp (e.g. "tfx.internal.Unnest").
string op_type = 1;
// Generic argument type that can express either another Node's outputs
// or a static value.
message Arg {
oneof kind {
// ID of another Node instance, referring to its output.
string node_id = 1;
// Static value.
Value value = 2;
}
}
// Optional positional arguments; its kind must be `node_id`.
repeated Arg args = 2;
// Optional keyword arguments; its kind must be `value`.
map<string, Arg> kwargs = 3;
}
// Dict of other Nodes where each Node.output_data_type is ARTIFACT_LIST.
// Its own output_data_type in the parent Node message is ARTIFACT_MULTIMAP.
// Corresponds to the tfx.dsl.resolver_op.DictNode.
message DictNode {
// Map from dict key to node ID.
map<string, string> node_ids = 1;
}
// InputNode refers a NodeInputs.inputs[input_key]. Its output_data_type is
// always ARTIFACT_LIST.
// Corresponds to the tfx.dsl.resolver_op.InputNode.
message InputNode {
string input_key = 1;
}
// Nodes of this input graph. Key is the node ID.
map<string, Node> nodes = 1;
// The ID of the output node of the input graph. The output data type of the
// input graph is the output data type of the reuslt node.
string result_node = 2;
}
// A proto message wrapping all information needed to query one set of artifacts
// from MLMD.
message InputSpec {
// Channel consists of a multiple MLMD filters whose result is always a
// homogeneous list of artifacts.
message Channel {
// Information to query the producer node of the artifacts.
message ProducerNodeQuery {
// The unique identifier of the node that produced the artifacts.
string id = 1;
// Predicate on producer node properties.
PropertyPredicate property_predicate = 2;
}
// Information to query the contexts the desired artifacts are in.
message ContextQuery {
// The type of the Context.
ml_metadata.ContextType type = 1;
// The name of the context.
Value name = 2;
// Predicate on the context properties.
PropertyPredicate property_predicate = 3;
}
// Information to query the desired artifacts.
message ArtifactQuery {
// The type of the artifact.
ml_metadata.ArtifactType type = 1;
// Predicate on the artifact properties.
PropertyPredicate property_predicate = 2;
}
ProducerNodeQuery producer_node_query = 1;
repeated ContextQuery context_queries = 2;
ArtifactQuery artifact_query = 3;
// The output key of the channel. Consider a `Trainer` with two output
// channels: when downstream nodes consume its outputs, output key(s) need
// to be specified:
// ```
// evaluator = tfx.Evaluator(model=trainer.outputs['some_output_key'])
// ```
// where 'some_output_key' is the output key for the channel that evaluator
// uses as one of its input.
string output_key = 4;
google.protobuf.Any metadata_connection_config = 5;
}
// Statically decided artifacts.
message Static {
repeated int64 artifact_ids = 1;
}
// Reference to the InputGraph result.
// When the InputGraph result is a ARTIFACT_MULTIMAP_LIST type, InputGraphRef
// is NOT simple referring a list of artifacts; instead we should consider the
// entire NodeInputs to know the final resolved inputs.
// For example, assume input graph `g1` is evaluated as:
//
// [{"x": [x1], "y": [y1]}, {"x": [x2], "y": [y2]}]
//
// (x1, x2, y1, y2 are artifacts), and the NodeInputs is:
//
// inputs["xx"] {
// input_graph_ref {
// graph_id: "g1"
// key: "x"
// }
// }
//
// then the final resolved inputs is `[{"xx": [x1]}, {"xx": [x2]}]`, not
// `{"xx": [x1, x2]}`. If there is another NodeInputs "yy":
//
// inputs["yy"] {
// input_graph_ref {
// graph_id: "g1"
// key: "y"
// }
// }
//
// then the result is *zipped* as they're from the same input graph source:
//
// [{"xx": [x1], "yy": [y1]}, {"xx": [x2], "yy": [y2]}]
//
// but otherwise the result is a *cartesian product* of them. For example if
// another input graph `g2` is evaluated as `[{"z": [z1]}, {"z": [z2]}]` and
// there is another NodeInputs "zz" referring to it, then the resolved inputs
// is a list of 4 dicts:
//
// [{"xx": [x1], "zz": [z1]}, {"xx": [x1], "zz": [z2]},
// {"xx": [x2], "zz": [z1]}, {"xx": [x2], "zz": [z2]}]
message InputGraphRef {
// ID of the InputGraph in NodeInputs.input_graphs.
string graph_id = 1;
// Optional dict key if the result node type is ARTIFACT_MULTIMAP or
// ARTIFACT_MULTIMAP_LIST.
string key = 2;
}
// A mixture of InputSpecs.
message Mixed {
enum Method {
UNION = 0;
}
Method method = 1;
repeated string input_keys = 2;
}
// Exactly one of `channels`, `input_graph_ref`, `mixed_inputs`, or
// `static_inputs` field should be set. We don't mandate oneof constraints as
// the legacy `channels` field is a repeated field that cannot be wrapped into
// oneof.
// Union of Channels.
repeated Channel channels = 1;
// A reference to the InputGraph.
InputGraphRef input_graph_ref = 3;
// A mixture of InputSpecs.
Mixed mixed_inputs = 4;
// Static artifacts.
Static static_inputs = 6;
// The minimum number of artifacts desired. If minimum requirement is not met,
// the execution should not be triggered. If min_count is less than or equal
// to 0, it means this input is optional.
int32 min_count = 2;
// Whether the input should be hidden from the final resolved result. Only
// the input keys whose corresponding `InputSpec.hidden = false` will be
// included in the input map that is passed to the `Executor.Do`.
bool hidden = 5;
}
// The proto message describes specs of all inputs needed for a component
// execution.
message NodeInputs {
// A map between the input tag and specs for the inputs of that tag.
map<string, InputSpec> inputs = 1;
// Deprecated. Use input_graphs and inputs.input_graph_ref instead.
// If resolver_config is set, then InputSpec.inputs should only contains
// `channels` field (not `input_graph_ref`), and both `input_graphs` and
// `conditionals` fields are ignored.
ResolverConfig resolver_config = 2 [deprecated = true];
// Optional InputGraphs mapping where key is an ID. Graph is referenced from
// InputSpec.input_graph_ref.
map<string, InputGraph> input_graphs = 3;
// Conditional representation.
message Conditional {
PlaceholderExpression placeholder_expression = 1;
}
// Optional conditionals to filter the valid inputs that satisfy the
// predicate. Conditional evaluation and filtering would happen after
// resolving all inputs from NodeInputs.inputs and NodeInputs.input_graphs.
map<string, Conditional> conditionals = 4;
}
// A proto message wrapping all information needed to query one set of artifacts
// from MLMD.
message OutputSpec {
// Information of the desired artifacts.
message ArtifactSpec {
// The name of the artifact type.
ml_metadata.ArtifactType type = 1;
// Additional properties to set when outputting artifacts.
map<string, Value> additional_properties = 2;
// Additional custom properties to set when outputting artifacts.
map<string, Value> additional_custom_properties = 3;
// Predefined URI(s) that will explicitly control Artifact.uri for the ㅤ
// external artifacts produced by this output channel. ㅤ
repeated string external_artifact_uris = 4;
}
ArtifactSpec artifact_spec = 1;
// Garbage collection policy of the component output channel.
GarbageCollectionPolicy garbage_collection_policy = 2;
}
// TODO(b/163596295): Remove this along with other usages.
// Deprecated. Executor specification will be set in pipeline.deployment_config.
message ExecutorSpec {
// Executor specification for Python-class based executors.
message PythonClassExecutorSpec {
// The full class path of the executor.
string class_path = 1;
}
oneof spec {
PythonClassExecutorSpec python_class_executor_spec = 1;
}
}
// Spec of a context.
message ContextSpec {
// The type of the context.
ml_metadata.ContextType type = 1;
// The name of the context.
Value name = 2;
// Properties of the context.
map<string, Value> properties = 3;
}
// Basic info of a pipeline node, including the type and id of the node.
// The information in `NodeInfo` should stay stable across time. Asynchronous
// data fetching behavior might change if this changes.
message NodeInfo {
// The MLMD type of the node. For example, is it an `ExampleGen` or `Trainer`.
ml_metadata.ExecutionType type = 1;
// The unique identifier of the node within the pipeline definition. This id
// will be used in upstream and downstream nodes to indicate node
// dependencies. This is generated by the system.
string id = 2;
}
// Specifications of contexts that this node belongs to. All input artifacts,
// output artifacts and execution of the node will be linked to the (MLMD)
// contexts generated from these specifications.
message NodeContexts {
repeated ContextSpec contexts = 1;
}
// Specifications for node outputs.
message NodeOutputs {
map<string, OutputSpec> outputs = 1;
}
// Specifications for node parameters.
message NodeParameters {
map<string, Value> parameters = 1;
}
// Options for executing the node.
message NodeExecutionOptions {
message CachingOptions {
// Whether or not to enable cache for this node.
bool enable_cache = 1;
}
message Run {
// If perform_snapshot is true, this node will perform the snapshot step.
bool perform_snapshot = 1;
// If depends_on_snapshot is true, the snapshot step must be complete before
// this node's executor can run.
// Note that it is possible for the node that performs the snapshot to
// also have an executor that depends on the snapshot step.
bool depends_on_snapshot = 2;
}
message Skip {
// Deprecated. Please use reuse_artifacts_mode field instead.
// If reuse_artifacts is true, the snapshot operation will make sure that
// output artifacts produced by this node in a previous pipeline run will
// be made available in this partial run.
bool reuse_artifacts = 1 [deprecated = true];
enum ReuseArtifactsMode {
UNSPECIFIED = 0;
// The snapshot operation will not reuse any output artifacts for this
// node.
NEVER = 1;
// The snapshot operation will make sure that output artifacts produced by
// this node in a previous pipeline run will be made available in this
// partial run.
REQUIRED = 2;
// The snapshot operation will attempt to reuse output artifacts at
// best effort basis.
OPTIONAL = 3;
}
ReuseArtifactsMode reuse_artifacts_mode = 2;
}
CachingOptions caching_options = 1;
// Attached by platform-level tooling.
oneof partial_run_option {
// If set, this node will be run as part of the partial run.
Run run = 2;
// If set, this node will be skipped in the partial run.
Skip skip = 3;
}
// LINT.IfChange
// An enum defines the trigger strategy of when the node will be ready to be
// triggered. Only supported in the experimental orchestrator under SYNC mode
// and ignored when configured for other platforms.
// Note that for all trigger strategies outlined below, all
// upstream nodes with either task or data dependency needs to be in a final
// state.
enum TriggerStrategy {
// Unspecified. Behave the same as ALL_UPSTREAM_NODES_SUCCEEDED.
TRIGGER_STRATEGY_UNSPECIFIED = 0;
// Specifies that all upstream nodes are in succeeded state.
ALL_UPSTREAM_NODES_SUCCEEDED = 1;
// Specifies that all upstream nodes are in any final state.
ALL_UPSTREAM_NODES_COMPLETED = 2;
}
// The trigger strategy of this node.
// Unset or set to default value of TRIGGER_STATEGY_UNDEFINED behaves the
// same as ALL_UPSTREAM_NODES_SUCCEEDED.
TriggerStrategy strategy = 4;
// If set, the node's success is optional for orchestration. In other words,
// the node will always be treated as a succeeded node when it completes, both
// when triggering downstream nodes with either task or data dependency and
// when determining final pipeline run outcome. Only supported in the
// experimental orchestrator under SYNC mode and ignored when configured for
// other platforms.
bool node_success_optional = 5;
// LINT.ThenChange(tfx/orchestration/experimental/core/sync_pipeline_task_gen.py)
// Maximum number of times to retry an execution if it failed. Only supported
// in the internal orchestrator.
uint32 max_execution_retries = 6;
// Maximum time in seconds before failing an execution. Only supported
// in the internal orchestrator. Default to 0 which means never timeout.
uint32 execution_timeout_sec = 7;
// Fine-grained trigger configuration for the ASYNC execution mode.
message AsyncTrigger {
message InputTrigger {
// Trigger by new property values seen.
message TriggerByProperty {
repeated string property_keys = 1;
}
oneof type {
// If `True`, this input is not used for triggering.
bool no_trigger = 1;
// Trigger if new artifact property values are seen. For example if you
// want the component to be triggered if the new `span` arrives but skip
// when the new `version` of the previous span arrives, set this to
// `{ property_keys: ["span"] }`.
// Note that `properties` and `custom_properties` are not distinguished
// (but `properties` is higher precedence). If the property key is not
// found in both property dicts, it will always trigger the new
// execution (default behavior).
TriggerByProperty trigger_by_property = 2;
// Future candidate input triggers: Throttling(timedelta)
}
}
// Map from `input_key` to `InputTrigger`.
// By default (i.e. `input_key` not presents in the map), any new input
// artifacts (distinguished by `id`) will trigger new execution.
map<string, InputTrigger> input_triggers = 1;
}
// Async mode trigger configuration for this node.
AsyncTrigger async_trigger = 8;
}
// Pipeline node definition.
message PipelineNode {
// Basic info of a pipeline node.
NodeInfo node_info = 1;
// Specification for contexts that this node belongs to.
NodeContexts contexts = 2;
// Specification for node inputs.
NodeInputs inputs = 3;
// Specification for node outputs.
NodeOutputs outputs = 4;
// Specification for node parameters.
NodeParameters parameters = 5;
// Specification for the executor of the node.
ExecutorSpec executor = 6 [deprecated = true];
// Ids of the upstream nodes of the current node.
repeated string upstream_nodes = 7;
// Ids of the downstream nodes of the current node.
repeated string downstream_nodes = 8;
// Options for executing the node.
NodeExecutionOptions execution_options = 9;
}
// Settings used for snapshot during partial run.
// One of the nodes will call `partial_run_utils.snapshot(...)`, allowing this
// partial run to reuse artifacts from a previous pipeline run.
message SnapshotSettings {
message LatestPipelineRunStrategy {}
message BasePipelineRunStrategy {
string base_run_id = 1;
}
oneof artifact_reuse_strategy {
LatestPipelineRunStrategy latest_pipeline_run_strategy = 1;
BasePipelineRunStrategy base_pipeline_run_strategy = 2;
}
}
// Message struct that contains pipeline runtime specifications.
message PipelineRuntimeSpec {
// Required field. Base directory of the pipeline. If not specified in DSL,
// sub-pipelines will be compiled to use the same pipeline root as the parent
// pipeline.
Value pipeline_root = 1;
// A unique id to identify a pipeline run. This will not be set during
// compilation time but is required for synchronous pipeline execution.
Value pipeline_run_id = 2;
// Used for partial runs.
SnapshotSettings snapshot_settings = 8;
}
// Basic info of a pipeline.
// The information in `PipelineInfo` should stay stable across time.
// Asynchronous data fetching behavior might change if this changes.
message PipelineInfo {
// Required field. A pipeline must have an id.
string id = 1;
}
// Definition for a uDSL pipeline. This is also the definition of a
// sub-pipeline.
message Pipeline {
enum ExecutionMode {
EXECUTION_MODE_UNSPECIFIED = 0;
SYNC = 1;
ASYNC = 2;
}
// A node inside a pipeline can be either a `PipelineNode` or a `Pipeline` as
// a sub-pipeline.
message PipelineOrNode {
oneof node {
// A normal pipeline node. This is the unsplittable execution unit.
PipelineNode pipeline_node = 1;
// Sub-pipelines should only have execution mode `SYNC`.
Pipeline sub_pipeline = 2;
}
}
PipelineInfo pipeline_info = 1;
repeated PipelineOrNode nodes = 2;
PipelineRuntimeSpec runtime_spec = 3;
// Execution mode of the pipeline. Only the outermost pipeline can be `ASYNC`.
ExecutionMode execution_mode = 4;
// Deprecated. Please use 'deployment_config' instead.
// Configs for different platforms, keyed by tags for different platforms that
// users provide.
map<string, google.protobuf.Any> platform_configs = 5 [deprecated = true];
// Deployment config for the pipeline. This usually includes the following:
// - A map from `node_id` to executor specification. This should be set for
// all nodes that have business logic.
// - A map from `node_id` to custom driver specification. This should be set
// only when custom driver is involved which is a rare advanced use case.
// - A map from label to platform specific configs.
// - ML-metadata connection config.
// - Other configs.
google.protobuf.Any deployment_config = 7;
// TFX DSL SDK version for this pipeline.
string sdk_version = 6;
}
// Definition of the intermediate 'deployment_config' generated by the general
// TFX DSL compiler. The result will be reinterpreted by different runners for
// different platforms.
message IntermediateDeploymentConfig {
// A key from `node_id` to executor specs. Note that this will cover all nodes
// that has business logic to process.
map<string, google.protobuf.Any> executor_specs = 1;
// A key from `node_id` to custom driver specs. Note that this map usually has
// less entries than the `executor_specs` as we only expect advanced users to
// set custom driver logic.
map<string, google.protobuf.Any> custom_driver_specs = 2;
// TODO(b/164108495): Figures out the DSL channel to set node level platform
// configs.
// A key from `node_id` to platform specs. This is placeholders for extra
// platform related specifications which are set by users or platform runners
// at node level.
map<string, google.protobuf.Any> node_level_platform_configs = 3;
// Pipeline level platform specific configs.
google.protobuf.Any pipeline_level_platform_config = 4;
// Connection config to ML-metadata.
google.protobuf.Any metadata_connection_config = 5;
}
// Pipeline-level specifications for partial run that are exposed to users.
message PartialRun {
// Source node ids.
// Only run nodes that are reachable downstream from from_nodes (inclusive).
repeated string from_nodes = 1;
// Sink node ids.
// Only run nodes that are reachable upstream from to_nodes (inclusive).
repeated string to_nodes = 2;
// Skip node ids.
// Nodes between from and to nodes that can be skipped for pipeline run. Note
// that if a node depends on nodes that can not be skipped, it will run for
// pipeline result correctness.
// For example, in the following case
// A --> B (from_nodes)--> C --> D (to_nodes)
// 1) if skip_nodes = [B], then nodes C, D get run;
// 2) if skip_nodes = [C], then nodes B, C, D will run.
repeated string skip_nodes = 4;
// Settings used for snapshot during partial run.
SnapshotSettings snapshot_settings = 3;
}
// RunOptions is passed to the DAG runner when the pipeline is run.
// It is not part of the Pipeline IR.
message RunOptions {
// Pipeline-level specifications for partial run.
PartialRun partial_run = 1;
}
// UpdateOptions is passed to the pipeline runner when the pipeline is run.
// It is not part of the Pipeline IR.
message UpdateOptions {
enum ReloadPolicy {
// Reload all active nodes upon pipeline update.
ALL = 0;
// Reload only the nodes selected in `reload_nodes`.
PARTIAL = 1;
}
ReloadPolicy reload_policy = 1;
// The ids of nodes to reload, which can be empty if no node is to be
// reloaded. Only applicable when ReloadPolicy is PARTIAL.
repeated string reload_nodes = 2;
}