def substitute_runtime_parameter()

in tfx/orchestration/portable/runtime_parameter_utils.py [0:0]


def substitute_runtime_parameter(
    msg: message.Message,
    parameter_bindings: Mapping[str, types.Property]
    ) -> Mapping[str, types.Property]:
  """Utility function to substitute runtime parameter placeholders with values.

  Args:
    msg: The original message to change. Only messages defined under
      pipeline_pb2 will be supported. Other types will result in no-op.
    parameter_bindings: A dict of parameter keys to parameter values that will
      be used to substitute the runtime parameter placeholder.

  Returns:
    A dict of all runtime parameters to their populated values
  """
  if not isinstance(msg, message.Message):
    return {}

  parameters = {}
  # If the message is a pipeline_pb2.Value instance, try to find an substitute
  # with runtime parameter bindings.
  if isinstance(msg, pipeline_pb2.Value):
    value = cast(pipeline_pb2.Value, msg)
    which = value.WhichOneof('value')
    if which == 'runtime_parameter':
      real_value = _get_runtime_parameter_value(value.runtime_parameter,
                                                parameter_bindings)
      parameters[value.runtime_parameter.name] = real_value
      if real_value is None:
        return parameters
      value.Clear()
      data_types_utils.set_metadata_value(
          metadata_value=value.field_value, value=real_value)
    if which == 'structural_runtime_parameter':
      real_value = _get_structural_runtime_parameter_value(
          value.structural_runtime_parameter, parameter_bindings, parameters)
      if real_value is None:
        return parameters
      value.Clear()
      data_types_utils.set_metadata_value(
          metadata_value=value.field_value, value=real_value)

    return parameters

  # For other cases, recursively call into sub-messages if any.
  for field, sub_message in msg.ListFields():
    # No-op for non-message types.
    if field.type != descriptor.FieldDescriptor.TYPE_MESSAGE:
      continue
    # Evaluates every map values in a map.
    elif (field.message_type.has_options and
          field.message_type.GetOptions().map_entry):
      for key in sub_message:
        parameters.update(
            substitute_runtime_parameter(sub_message[key], parameter_bindings))
    # Evaluates every entry in a list.
    elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
      for element in sub_message:
        parameters.update(
            substitute_runtime_parameter(element, parameter_bindings))
    # Evaluates sub-message.
    else:
      parameters.update(
          substitute_runtime_parameter(sub_message, parameter_bindings))
  return parameters