def apply()

in tfx/dsl/input_resolution/ops/span_driven_evaluator_inputs_op.py [0:0]


  def apply(self, input_dict: typing_utils.ArtifactMultiDict):
    """Returns a single Model and the list of Examples to evaluate it with.

    The input_dict is expected to have the following format:

    {
        "model": [Model 1, Model 2, ...],
        "examples": [Examples 1, Examples 2...]
    }

    where "model" and "examples" are required keys.

    Only the latest version of each Example is considered. Also note that only
    the standard TFleX Model and Examples artifacts are supported.

    Args:
      input_dict: An input dict containing "model" and "examples" as keys and
        lists of Model and Examples, respectively.

    Returns:
      A dictionary containing a single Model and the list of Examples to
      evaluate it with. Note that the Model will be latest created, eligible
      model.

      For example:
      {
          "model": [Model 3],
          "examples": [Examples 3, Examples 4]
      }

    Raises:
      InvalidArgument: If the input_dict is malformed.
      SkipSignal:
        1) input_dict is empty or either of its values are empty.
        2) The evaluation span range contains spans not present in the passed in
           Examples.
        3) No Model was found that was trained on an Example with a small enough
           span.
    """
    _validate_input_dict(input_dict)

    ops_utils.validate_argument(
        'wait_spans_before_eval', self.wait_spans_before_eval, min_value=0
    )
    ops_utils.validate_argument(
        'start_span_number', self.start_span_number, min_value=0
    )
    ops_utils.validate_argument(
        'additional_spans_per_eval', self.additional_spans_per_eval, min_value=0
    )

    # Precompute the training Examples for each Model.
    trained_examples_by_model = {}
    for model in input_dict[ops_utils.MODEL_KEY]:
      trained_examples_by_model[model] = training_range_op.training_range(
          self.context.store, model
      )

    # Sort the Models by latest created, with ties broken by id.
    trained_examples_by_model = dict(
        sorted(
            trained_examples_by_model.items(),
            key=lambda kv: (  # pylint: disable=g-long-lambda
                kv[0].mlmd_artifact.create_time_since_epoch,
                kv[0].id,
            ),
            reverse=True,
        )
    )

    # Sort Examples by span, then version, ties broken by id.
    sorted_examples = ops_utils.filter_artifacts_by_span(
        artifacts=input_dict[ops_utils.EXAMPLES_KEY],
        span_descending=False,
        n=0,
        min_span=self.start_span_number,
        keep_all_versions=False,
    )
    if not sorted_examples:
      raise exceptions.SkipSignal()
    min_span = sorted_examples[0].span
    max_span = sorted_examples[-1].span

    # Consider Examples in the range (inclusive):
    # [max_span - wait_spans_before_eval - additional_spans_per_eval,
    #  max_span - wait_spans_before_eval]
    end_span = max_span - self.wait_spans_before_eval
    start_span = end_span - self.additional_spans_per_eval

    if start_span < min_span or end_span < min_span:
      raise exceptions.SkipSignal()

    # Find the Model trained on Examples with a span less than
    # start_span - self.evaluation_training_offset.
    model = self._get_model_to_evaluate(
        trained_examples_by_model,
        start_span - self.evaluation_training_offset,
    )

    # Keep Examples that fall in [start_span, end_span], inclusive.
    examples_window = [
        e
        for e in sorted_examples
        if e.span >= start_span and e.span <= end_span
    ]

    # The evaluation dictionary will contain the latest created and eligible
    # Model to evaluate, and the Examples to evaluate it with.
    return {
        ops_utils.MODEL_KEY: [model],
        ops_utils.EXAMPLES_KEY: examples_window,
    }