in tfx/benchmarks/tfma_v2_benchmark_base.py [0:0]
def _runMetricsPlotsAndValidationsEvaluatorManualActuation(
self,
with_confidence_intervals,
multi_model,
metrics_specs=None,
validation=False):
"""Benchmark MetricsPlotsAndValidationsEvaluator "manually"."""
self._init_model(multi_model, validation)
if not metrics_specs:
metrics_specs = self._eval_config.metrics_specs
extracts = self._readDatasetIntoBatchedExtracts()
num_examples = sum(
[e[constants.ARROW_RECORD_BATCH_KEY].num_rows for e in extracts])
extracts = [self._extract_features_and_labels(e) for e in extracts]
prediction_do_fn = model_util.ModelSignaturesDoFn(
eval_config=self._eval_config,
eval_shared_models=self._eval_shared_models,
signature_names={
constants.PREDICTIONS_KEY: {
name: [None] for name in self._eval_shared_models
}
},
prefer_dict_outputs=False)
prediction_do_fn.setup()
# Have to predict first
predict_result = []
for e in extracts:
predict_result.extend(prediction_do_fn.process(e))
# Unbatch extracts
unbatched_extracts = []
for e in predict_result:
unbatched_extracts.extend(unbatch_extractor._extract_unbatched_inputs(e)) # pylint: disable=protected-access
# Add global slice key.
for e in unbatched_extracts:
e[tfma.SLICE_KEY_TYPES_KEY] = ()
# Now Evaluate
inputs_per_accumulator = 1000
start = time.time()
for _ in range(_ITERS):
computations, _, _, _ = (
# pylint: disable=protected-access
metrics_plots_and_validations_evaluator
._filter_and_separate_computations(
metric_specs_util.to_computations(
metrics_specs, eval_config=self._eval_config)))
# pylint: enable=protected-access
processed = []
for elem in unbatched_extracts:
processed.append(
next(
metrics_plots_and_validations_evaluator._PreprocessorDoFn( # pylint: disable=protected-access
computations).process(elem)))
combiner = metrics_plots_and_validations_evaluator._ComputationsCombineFn( # pylint: disable=protected-access
computations=computations)
if with_confidence_intervals:
combiner = poisson_bootstrap._BootstrapCombineFn(combiner) # pylint: disable=protected-access
combiner.setup()
accumulators = []
for batch in benchmark_utils.batched_iterator(processed,
inputs_per_accumulator):
accumulator = combiner.create_accumulator()
for elem in batch:
accumulator = combiner.add_input(accumulator, elem)
accumulators.append(accumulator)
final_accumulator = combiner.merge_accumulators(accumulators)
final_output = combiner.extract_output(final_accumulator)
end = time.time()
delta = end - start
# Sanity check the example count. This is not timed.
example_count_key = metric_types.MetricKey(
name="example_count", model_name="candidate" if multi_model else "")
if example_count_key in final_output:
example_count = final_output[example_count_key]
else:
raise ValueError("example_count_key ({}) was not in the final list of "
"metrics. metrics were: {}".format(
example_count_key, final_output))
if with_confidence_intervals:
# If we're computing using confidence intervals, the example count will
# not be exact.
lower_bound = int(0.9 * num_examples)
upper_bound = int(1.1 * num_examples)
if example_count < lower_bound or example_count > upper_bound:
raise ValueError("example count out of bounds: expecting "
"%d < example_count < %d, but got %d" %
(lower_bound, upper_bound, example_count))
else:
# If we're not using confidence intervals, we expect the example count to
# be exact.
if example_count != num_examples:
raise ValueError("example count mismatch: expecting %d got %d" %
(num_examples, example_count))
self.report_benchmark(
iters=_ITERS,
wall_time=delta,
extras={
"inputs_per_accumulator": inputs_per_accumulator,
"num_examples": num_examples
})