in tfx/orchestration/experimental/core/sync_pipeline_task_gen.py [0:0]
def __call__(self) -> List[task_lib.Task]:
layers = _topsorted_layers(self._pipeline)
skipped_node_ids = _skipped_node_ids(self._pipeline)
terminal_node_ids = _terminal_node_ids(layers, skipped_node_ids)
exec_node_tasks = []
update_node_state_tasks = []
successful_node_ids = set()
failed_nodes_dict: Dict[str, status_lib.Status] = {}
finalize_pipeline_task = None
for layer_nodes in layers:
for node in layer_nodes:
node_id = node.node_info.id
node_uid = task_lib.NodeUid.from_node(self._pipeline, node)
node_state = self._node_states_dict[node_uid]
if node_state.is_success() or (node_state.is_failure(
) and node.execution_options.node_success_optional):
successful_node_ids.add(node_id)
continue
if node_state.is_failure():
failed_nodes_dict[node_id] = node_state.status
continue
if not self._trigger_strategy_satisfied(node, successful_node_ids,
failed_nodes_dict):
continue
tasks = self._generate_tasks_for_node(node)
for task in tasks:
if isinstance(task, task_lib.UpdateNodeStateTask):
if pstate.is_node_state_success(
task.state) or (pstate.is_node_state_failure(task.state) and
node.execution_options.node_success_optional):
successful_node_ids.add(node_id)
elif pstate.is_node_state_failure(task.state):
failed_nodes_dict[node_id] = task.status
# While the pipeline can still proceed depending on the trigger
# strategy of descending nodes, the fail fast option should only
# be used together with ALL_UPSTREAM_NODES_SUCCEEDED since it will
# fail the pipeline if any node fails.
if self._fail_fast:
finalize_pipeline_task = self._abort_task(failed_nodes_dict)
update_node_state_tasks.append(task)
elif isinstance(task, task_lib.ExecNodeTask):
exec_node_tasks.append(task)
if finalize_pipeline_task:
break
if finalize_pipeline_task:
break
if not self._fail_fast and failed_nodes_dict:
assert not finalize_pipeline_task
node_by_id = _node_by_id(self._pipeline)
# Collect nodes that cannot be run because they have a failed ancestor.
unrunnable_descendant_ids = set()
for node_id in failed_nodes_dict:
unrunnable_descendant_ids |= _unrunnable_descendants(
node_by_id, node_id)
# Nodes that are still runnable have neither succeeded nor failed, don't
# have a failed ancestor, or have a triggering strategy that ignores
# upstream failures.
runnable_node_ids = node_by_id.keys() - (
unrunnable_descendant_ids | successful_node_ids
| failed_nodes_dict.keys())
if not runnable_node_ids:
# If there are no runnable nodes and not all nodes are completed,
# we can abort the pipeline.
if unrunnable_descendant_ids:
finalize_pipeline_task = self._abort_task(failed_nodes_dict)
# If all nodes are completed and not all terminal nodes are successful,
# the pipeline should be marked failed.
elif terminal_node_ids & failed_nodes_dict.keys():
failed_terminal_nodes = {
k: v
for k, v in failed_nodes_dict.items()
if k in terminal_node_ids
}
finalize_pipeline_task = self._abort_task(failed_terminal_nodes)
result = update_node_state_tasks
if finalize_pipeline_task:
result.append(finalize_pipeline_task)
elif terminal_node_ids <= successful_node_ids:
# If all terminal nodes are successful, the pipeline can be finalized.
result.append(
task_lib.FinalizePipelineTask(
pipeline_uid=self._pipeline_uid,
status=status_lib.Status(code=status_lib.Code.OK)))
else:
result.extend(exec_node_tasks)
return result