def testFetchPreviousResult()

in tfx/orchestration/metadata_test_utils.py [0:0]


  def testFetchPreviousResult(self):
    with self.metadata() as m:

      # Create an 'previous' execution.
      exec_properties = {'log_root': 'path'}
      contexts = m.register_pipeline_contexts_if_not_exists(self._pipeline_info)
      input_artifacts = {'input': [standard_artifacts.Examples()]}
      output_artifact = standard_artifacts.Examples()
      output_artifact.uri = 'my_uri'
      output_artifacts = {'output': [output_artifact]}
      m.register_execution(
          input_artifacts=input_artifacts,
          exec_properties=exec_properties,
          pipeline_info=self._pipeline_info,
          component_info=self._component_info,
          contexts=contexts)
      m.publish_execution(
          component_info=self._component_info,
          output_artifacts=output_artifacts)

      # Test previous_run.
      self.assertIsNone(
          m.get_cached_outputs(
              input_artifacts={},
              exec_properties=exec_properties,
              pipeline_info=self._pipeline_info,
              component_info=self._component_info))
      self.assertIsNone(
          m.get_cached_outputs(
              input_artifacts=input_artifacts,
              exec_properties=exec_properties,
              pipeline_info=self._pipeline_info,
              component_info=data_types.ComponentInfo(
                  component_id='unique',
                  component_type='a.b.c',
                  pipeline_info=self._pipeline_info)))
      # Having the same set of input artifact ids, but duplicated.
      self.assertIsNone(
          m.get_cached_outputs(
              input_artifacts={
                  'input': input_artifacts['input'],
                  'another_input': input_artifacts['input']
              },
              exec_properties=exec_properties,
              pipeline_info=self._pipeline_info,
              component_info=self._component_info))
      cached_output_artifacts = m.get_cached_outputs(
          input_artifacts=input_artifacts,
          exec_properties=exec_properties,
          pipeline_info=self._pipeline_info,
          component_info=self._component_info)
      self.assertEqual(len(cached_output_artifacts), 1)
      self.assertEqual(len(cached_output_artifacts['output']), 1)
      cached_output_artifact = cached_output_artifacts['output'][
          0].mlmd_artifact
      # Skip verifying time sensitive fields.
      cached_output_artifact.ClearField('create_time_since_epoch')
      cached_output_artifact.ClearField('last_update_time_since_epoch')
      self.assertProtoEquals(cached_output_artifact,
                             output_artifact.mlmd_artifact)