def _prepare_execution()

in tfx/orchestration/portable/launcher.py [0:0]


  def _prepare_execution(self) -> _ExecutionPreparationResult:
    """Prepares inputs, outputs and execution properties for actual execution."""
    with self._mlmd_connection as m:
      # 1.Prepares all contexts.
      contexts = context_lib.prepare_contexts(
          metadata_handler=m, node_contexts=self._pipeline_node.contexts)

      # 2. Resolves inputs and execution properties.
      exec_properties = data_types_utils.build_parsed_value_dict(
          inputs_utils.resolve_parameters_with_schema(
              node_parameters=self._pipeline_node.parameters))

      try:
        resolved_inputs = inputs_utils.resolve_input_artifacts(
            pipeline_node=self._pipeline_node,
            metadata_handler=m)
        logging.info('[%s] Resolved inputs: %s',
                     self._pipeline_node.node_info.id, resolved_inputs)
      except exceptions.InputResolutionError as e:
        logging.exception('[%s] Input resolution error: %s',
                          self._pipeline_node.node_info.id, e)
        execution = self._register_or_reuse_execution(
            metadata_handler=m,
            contexts=contexts,
            exec_properties=exec_properties)
        if not execution_lib.is_execution_successful(execution):
          self._publish_failed_execution(
              execution_id=execution.id,
              contexts=contexts,
              executor_output=self._build_error_output(code=e.grpc_code_value))
        return _ExecutionPreparationResult(
            execution_info=self._build_execution_info(
                execution_id=execution.id),
            contexts=contexts,
            is_execution_needed=False)

      # 3. If not all required inputs are met. Return ExecutionInfo with
      # is_execution_needed being false. No publish will happen so down stream
      # nodes won't be triggered.
      # TODO(b/197907821): Publish special execution for Skip?
      if isinstance(resolved_inputs, inputs_utils.Skip):
        logging.info('Skipping execution for %s',
                     self._pipeline_node.node_info.id)
        return _ExecutionPreparationResult(
            execution_info=self._build_execution_info(),
            contexts=contexts,
            is_execution_needed=False)

      # TODO(b/197741942): Support len > 1.
      if len(resolved_inputs) > 1:
        executor_output = self._build_error_output(
            _ERROR_CODE_UNIMPLEMENTED,
            'Handling more than one input dicts not implemented yet.')
        execution = self._register_or_reuse_execution(
            metadata_handler=m,
            contexts=contexts,
            exec_properties=exec_properties)
        if not execution_lib.is_execution_successful(execution):
          self._publish_failed_execution(
              execution_id=execution.id,
              contexts=contexts,
              executor_output=executor_output)
        return _ExecutionPreparationResult(
            execution_info=self._build_execution_info(
                execution_id=execution.id),
            contexts=contexts,
            is_execution_needed=False)

      input_artifacts = resolved_inputs[0]

      # 4. Resolve the dynamic exec properties from implicit input channels.
      try:
        dynamic_exec_properties = inputs_utils.resolve_dynamic_parameters(
            node_parameters=self._pipeline_node.parameters,
            input_artifacts=input_artifacts)
        exec_properties.update(dynamic_exec_properties)
      except exceptions.InputResolutionError as e:
        execution = self._register_or_reuse_execution(
            metadata_handler=m,
            contexts=contexts,
            exec_properties=exec_properties)
        if not execution_lib.is_execution_successful(execution):
          self._publish_failed_execution(
              execution_id=execution.id,
              contexts=contexts,
              executor_output=self._build_error_output(code=e.grpc_code_value))
        return _ExecutionPreparationResult(
            execution_info=self._build_execution_info(
                execution_id=execution.id),
            contexts=contexts,
            is_execution_needed=False)

      # 5. Registers execution in metadata.
      execution = self._register_or_reuse_execution(
          metadata_handler=m,
          contexts=contexts,
          input_artifacts=input_artifacts,
          exec_properties=exec_properties)
      if execution_lib.is_execution_successful(execution):
        return _ExecutionPreparationResult(
            execution_info=self._build_execution_info(
                execution_id=execution.id),
            contexts=contexts,
            is_execution_needed=False)

      # 6. Resolve output
      output_artifacts = self._output_resolver.generate_output_artifacts(
          execution.id)

    # If there is a custom driver, runs it.
    if self._driver_operator:
      driver_output = self._driver_operator.run_driver(
          self._build_execution_info(
              input_dict=input_artifacts,
              output_dict=output_artifacts,
              exec_properties=exec_properties,
              execution_output_uri=(
                  self._output_resolver.get_driver_output_uri())))
      self._update_with_driver_output(driver_output, exec_properties,
                                      output_artifacts)

    # We reconnect to MLMD here because the custom driver closes MLMD connection
    # on returning.
    with self._mlmd_connection as m:
      # 7. Check cached result
      cache_context = cache_utils.get_cache_context(
          metadata_handler=m,
          pipeline_node=self._pipeline_node,
          pipeline_info=self._pipeline_info,
          executor_spec=self._executor_spec,
          input_artifacts=input_artifacts,
          output_artifacts=output_artifacts,
          parameters=exec_properties)
      contexts.append(cache_context)

      # 8. Should cache be used?
      if self._pipeline_node.execution_options.caching_options.enable_cache:
        cached_outputs = cache_utils.get_cached_outputs(
            metadata_handler=m, cache_context=cache_context)
        if cached_outputs is not None:
          # Publishes cache result
          execution_publish_utils.publish_cached_execution(
              metadata_handler=m,
              contexts=contexts,
              execution_id=execution.id,
              output_artifacts=cached_outputs)
          logging.info('A cached execution %d is used.', execution.id)
          return _ExecutionPreparationResult(
              execution_info=self._build_execution_info(
                  execution_id=execution.id,
                  input_dict=input_artifacts,
                  output_dict=output_artifacts,
                  exec_properties=exec_properties),
              execution_metadata=execution,
              contexts=contexts,
              is_execution_needed=False)

      # 9. Going to trigger executor.
      logging.info('Going to run a new execution %d', execution.id)
      return _ExecutionPreparationResult(
          execution_info=self._build_execution_info(
              execution_id=execution.id,
              input_dict=input_artifacts,
              output_dict=output_artifacts,
              exec_properties=exec_properties,
              execution_output_uri=(
                  self._output_resolver.get_executor_output_uri(execution.id)),
              stateful_working_dir=(
                  self._output_resolver.get_stateful_working_directory()),
              tmp_dir=self._output_resolver.make_tmp_dir(execution.id)),
          execution_metadata=execution,
          contexts=contexts,
          is_execution_needed=True)