def publish_execution_results_for_task()

in tfx/orchestration/experimental/core/post_execution_utils.py [0:0]


def publish_execution_results_for_task(mlmd_handle: metadata.Metadata,
                                       task: task_lib.ExecNodeTask,
                                       result: ts.TaskSchedulerResult) -> None:
  """Publishes execution results to MLMD for task."""

  def _update_state(
      status: status_lib.Status,
      execution_result: Optional[execution_result_pb2.ExecutionResult] = None
  ) -> None:
    assert status.code != status_lib.Code.OK
    _remove_temporary_task_dirs(
        stateful_working_dir=task.stateful_working_dir, tmp_dir=task.tmp_dir)
    if status.code == status_lib.Code.CANCELLED:
      logging.info('Cancelling execution (id: %s); task id: %s; status: %s',
                   task.execution_id, task.task_id, status)
      execution_state = proto.Execution.CANCELED
    else:
      logging.info(
          'Aborting execution (id: %s) due to error (code: %s); task id: %s',
          task.execution_id, status.code, task.task_id)
      execution_state = proto.Execution.FAILED
    _update_execution_state_in_mlmd(
        mlmd_handle=mlmd_handle,
        node_uid=task.node_uid,
        execution_id=task.execution_id,
        new_state=execution_state,
        error_code=status.code,
        error_msg=status.message,
        execution_result=execution_result)
    pipeline_state.record_state_change_time()

  if result.status.code != status_lib.Code.OK:
    _update_state(result.status)
    return

  if isinstance(result.output, ts.ExecutorNodeOutput):
    executor_output = result.output.executor_output
    if executor_output is not None:
      if executor_output.execution_result.code != status_lib.Code.OK:
        _update_state(
            status_lib.Status(
                # We should not reuse "execution_result.code" because it may be
                # CANCELLED, in which case we should still fail the execution.
                code=status_lib.Code.UNKNOWN,
                message=executor_output.execution_result.result_message),
            executor_output.execution_result)
        return
    _remove_temporary_task_dirs(
        stateful_working_dir=task.stateful_working_dir, tmp_dir=task.tmp_dir)
    # TODO(b/262040844): Instead of directly using the context manager here, we
    # should consider creating and using wrapper functions.
    with mlmd_state.evict_from_cache(task.execution_id):
      execution_publish_utils.publish_succeeded_execution(
          mlmd_handle,
          execution_id=task.execution_id,
          contexts=task.contexts,
          output_artifacts=task.output_artifacts,
          executor_output=executor_output)
    garbage_collection.run_garbage_collection_for_node(mlmd_handle,
                                                       task.node_uid,
                                                       task.get_node())
  elif isinstance(result.output, ts.ImporterNodeOutput):
    output_artifacts = result.output.output_artifacts
    _remove_temporary_task_dirs(
        stateful_working_dir=task.stateful_working_dir, tmp_dir=task.tmp_dir)
    # TODO(b/262040844): Instead of directly using the context manager here, we
    # should consider creating and using wrapper functions.
    with mlmd_state.evict_from_cache(task.execution_id):
      execution_publish_utils.publish_succeeded_execution(
          mlmd_handle,
          execution_id=task.execution_id,
          contexts=task.contexts,
          output_artifacts=output_artifacts)
  elif isinstance(result.output, ts.ResolverNodeOutput):
    resolved_input_artifacts = result.output.resolved_input_artifacts
    # TODO(b/262040844): Instead of directly using the context manager here, we
    # should consider creating and using wrapper functions.
    with mlmd_state.evict_from_cache(task.execution_id):
      execution_publish_utils.publish_internal_execution(
          mlmd_handle,
          execution_id=task.execution_id,
          contexts=task.contexts,
          output_artifacts=resolved_input_artifacts)
  else:
    raise TypeError(f'Unable to process task scheduler result: {result}')

  pipeline_state.record_state_change_time()