in tfx/orchestration/experimental/core/post_execution_utils.py [0:0]
def publish_execution_results_for_task(mlmd_handle: metadata.Metadata,
task: task_lib.ExecNodeTask,
result: ts.TaskSchedulerResult) -> None:
"""Publishes execution results to MLMD for task."""
def _update_state(
status: status_lib.Status,
execution_result: Optional[execution_result_pb2.ExecutionResult] = None
) -> None:
assert status.code != status_lib.Code.OK
_remove_temporary_task_dirs(
stateful_working_dir=task.stateful_working_dir, tmp_dir=task.tmp_dir)
if status.code == status_lib.Code.CANCELLED:
logging.info('Cancelling execution (id: %s); task id: %s; status: %s',
task.execution_id, task.task_id, status)
execution_state = proto.Execution.CANCELED
else:
logging.info(
'Aborting execution (id: %s) due to error (code: %s); task id: %s',
task.execution_id, status.code, task.task_id)
execution_state = proto.Execution.FAILED
_update_execution_state_in_mlmd(
mlmd_handle=mlmd_handle,
node_uid=task.node_uid,
execution_id=task.execution_id,
new_state=execution_state,
error_code=status.code,
error_msg=status.message,
execution_result=execution_result)
pipeline_state.record_state_change_time()
if result.status.code != status_lib.Code.OK:
_update_state(result.status)
return
if isinstance(result.output, ts.ExecutorNodeOutput):
executor_output = result.output.executor_output
if executor_output is not None:
if executor_output.execution_result.code != status_lib.Code.OK:
_update_state(
status_lib.Status(
# We should not reuse "execution_result.code" because it may be
# CANCELLED, in which case we should still fail the execution.
code=status_lib.Code.UNKNOWN,
message=executor_output.execution_result.result_message),
executor_output.execution_result)
return
_remove_temporary_task_dirs(
stateful_working_dir=task.stateful_working_dir, tmp_dir=task.tmp_dir)
# TODO(b/262040844): Instead of directly using the context manager here, we
# should consider creating and using wrapper functions.
with mlmd_state.evict_from_cache(task.execution_id):
execution_publish_utils.publish_succeeded_execution(
mlmd_handle,
execution_id=task.execution_id,
contexts=task.contexts,
output_artifacts=task.output_artifacts,
executor_output=executor_output)
garbage_collection.run_garbage_collection_for_node(mlmd_handle,
task.node_uid,
task.get_node())
elif isinstance(result.output, ts.ImporterNodeOutput):
output_artifacts = result.output.output_artifacts
_remove_temporary_task_dirs(
stateful_working_dir=task.stateful_working_dir, tmp_dir=task.tmp_dir)
# TODO(b/262040844): Instead of directly using the context manager here, we
# should consider creating and using wrapper functions.
with mlmd_state.evict_from_cache(task.execution_id):
execution_publish_utils.publish_succeeded_execution(
mlmd_handle,
execution_id=task.execution_id,
contexts=task.contexts,
output_artifacts=output_artifacts)
elif isinstance(result.output, ts.ResolverNodeOutput):
resolved_input_artifacts = result.output.resolved_input_artifacts
# TODO(b/262040844): Instead of directly using the context manager here, we
# should consider creating and using wrapper functions.
with mlmd_state.evict_from_cache(task.execution_id):
execution_publish_utils.publish_internal_execution(
mlmd_handle,
execution_id=task.execution_id,
contexts=task.contexts,
output_artifacts=resolved_input_artifacts)
else:
raise TypeError(f'Unable to process task scheduler result: {result}')
pipeline_state.record_state_change_time()