def apply()

in tfx/dsl/input_resolution/ops/latest_pipeline_run_outputs_op.py [0:0]


  def apply(self) -> typing_utils.ArtifactMultiMap:
    """Returns artifacts from the latest pipeline run.

    Returns:
      A dictionary, each value in the dict is a list of artifacts from the
      latest pipeline run.
    """
    if not self.pipeline_name:
      raise ValueError(
          'pipeline_name for LatestPipelineRunOutputs can not be empty.')

    # Gets the pipeline end node context.
    pipeline_end_node_name = compiler_utils.node_context_name(
        self.pipeline_name,
        compiler_utils.pipeline_end_node_id_from_pipeline_id(
            self.pipeline_name))
    pipeline_end_node_ctx = self.context.store.get_context_by_type_and_name(
        type_name=constants.NODE_CONTEXT_TYPE_NAME,
        context_name=pipeline_end_node_name)
    if not pipeline_end_node_ctx:
      raise exceptions.SkipSignal(
          f'Pipeline {self.pipeline_name} does not have a PipelineEnd node, '
          'possibly due to not defining the pipeline outputs.')

    # Gets the COMPLETE executions of the pipeline end node, and then find the
    # latest one.
    pipeline_end_node_executions = self.context.store.get_executions_by_context(
        context_id=pipeline_end_node_ctx.id,
        list_options=mlmd.ListOptions(
            filter_query='last_known_state = COMPLETE'))
    if not pipeline_end_node_executions:
      raise exceptions.SkipSignal(
          f'Pipeline {self.pipeline_name} does not have a successful execution.'
      )
    latest_execution = max(
        pipeline_end_node_executions,
        key=lambda e: (e.create_time_since_epoch, e.id))

    # From the latest execution, find out the latest artifacts.
    end_node_output_events = [
        e for e in self.context.store.get_events_by_execution_ids(
            execution_ids=[latest_execution.id])
        if event_lib.is_valid_output_event(e)
    ]
    if not end_node_output_events:
      raise exceptions.SkipSignal(
          f'Pipeline {self.pipeline_name} does not have any output artifacts '
          'from PipelineEnd node.')
    artifacts = self.context.store.get_artifacts_by_id(
        [e.artifact_id for e in end_node_output_events])
    artifact_types = self.context.store.get_artifact_types_by_id(
        list({a.type_id for a in artifacts}))
    artifact_types_by_id = {t.id: t for t in artifact_types}
    tfx_artifacts = [
        artifact_utils.deserialize_artifact(
            artifact_types_by_id[a.type_id], a) for a in artifacts]
    result = event_lib.reconstruct_artifact_multimap(
        tfx_artifacts, end_node_output_events)
    if self.output_keys:
      for key in list(result):
        if key not in self.output_keys:
          del result[key]
    return result