in tfx/components/evaluator/executor.py [0:0]
def Do(self, input_dict: Dict[str, List[types.Artifact]],
output_dict: Dict[str, List[types.Artifact]],
exec_properties: Dict[str, Any]) -> None:
"""Runs a batch job to evaluate the eval_model against the given input.
Args:
input_dict: Input dict from input key to a list of Artifacts.
- model: exported model.
- examples: examples for eval the model.
output_dict: Output dict from output key to a list of Artifacts.
- evaluation: model evaluation results.
exec_properties: A dict of execution properties.
- eval_config: JSON string of tfma.EvalConfig.
- feature_slicing_spec: JSON string of evaluator_pb2.FeatureSlicingSpec
instance, providing the way to slice the data. Deprecated, use
eval_config.slicing_specs instead.
- example_splits: JSON-serialized list of names of splits on which the
metrics are computed. Default behavior (when example_splits is set to
None) is using the 'eval' split.
Returns:
None
"""
if standard_component_specs.EXAMPLES_KEY not in input_dict:
raise ValueError('EXAMPLES_KEY is missing from input dict.')
if standard_component_specs.EVALUATION_KEY not in output_dict:
raise ValueError('EVALUATION_KEY is missing from output dict.')
if standard_component_specs.MODEL_KEY in input_dict and len(
input_dict[standard_component_specs.MODEL_KEY]) > 1:
raise ValueError('There can be only one candidate model, there are %d.' %
(len(input_dict[standard_component_specs.MODEL_KEY])))
if standard_component_specs.BASELINE_MODEL_KEY in input_dict and len(
input_dict[standard_component_specs.BASELINE_MODEL_KEY]) > 1:
raise ValueError(
'There can be only one baseline model, there are %d.' %
(len(input_dict[standard_component_specs.BASELINE_MODEL_KEY])))
self._log_startup(input_dict, output_dict, exec_properties)
# Add fairness indicator metric callback if necessary.
fairness_indicator_thresholds = json_utils.loads(
exec_properties.get(
standard_component_specs.FAIRNESS_INDICATOR_THRESHOLDS_KEY, 'null'))
add_metrics_callbacks = None
if fairness_indicator_thresholds:
add_metrics_callbacks = [
tfma.post_export_metrics.fairness_indicators( # pytype: disable=module-attr
thresholds=fairness_indicator_thresholds),
]
output_uri = artifact_utils.get_single_uri(
output_dict[constants.EVALUATION_KEY])
# Make sure user packages get propagated to the remote Beam worker.
unused_module_path, extra_pip_packages = udf_utils.decode_user_module_key(
exec_properties.get(standard_component_specs.MODULE_PATH_KEY, None))
for pip_package_path in extra_pip_packages:
local_pip_package_path = io_utils.ensure_local(pip_package_path)
self._beam_pipeline_args.append('--extra_package=%s' %
local_pip_package_path)
eval_shared_model_fn = udf_utils.try_get_fn(
exec_properties=exec_properties,
fn_name='custom_eval_shared_model') or tfma.default_eval_shared_model
run_validation = False
models = []
if (standard_component_specs.EVAL_CONFIG_KEY in exec_properties
and exec_properties[standard_component_specs.EVAL_CONFIG_KEY]):
slice_spec = None
has_baseline = bool(
input_dict.get(standard_component_specs.BASELINE_MODEL_KEY))
eval_config = tfma.EvalConfig()
proto_utils.json_to_proto(
exec_properties[standard_component_specs.EVAL_CONFIG_KEY],
eval_config)
# rubber_stamp is always assumed true, i.e., change threshold will always
# be ignored when a baseline model is missing.
if hasattr(tfma, 'utils'):
eval_config = tfma.utils.update_eval_config_with_defaults(
eval_config, has_baseline=has_baseline, rubber_stamp=True)
tfma.utils.verify_eval_config(eval_config)
else:
# TODO(b/171992041): Replaced by tfma.utils.
eval_config = tfma.update_eval_config_with_defaults(
eval_config, has_baseline=has_baseline, rubber_stamp=True)
tfma.verify_eval_config(eval_config)
# Do not validate model when there is no thresholds configured. This is to
# avoid accidentally blessing models when users forget to set thresholds.
run_validation = bool(
tfma.metrics.metric_thresholds_from_metrics_specs(
eval_config.metrics_specs, eval_config=eval_config))
if len(eval_config.model_specs) > 2:
raise ValueError(