in tfx/orchestration/portable/runtime_parameter_utils.py [0:0]
def substitute_runtime_parameter(
msg: message.Message,
parameter_bindings: Mapping[str, types.Property]
) -> Mapping[str, types.Property]:
"""Utility function to substitute runtime parameter placeholders with values.
Args:
msg: The original message to change. Only messages defined under
pipeline_pb2 will be supported. Other types will result in no-op.
parameter_bindings: A dict of parameter keys to parameter values that will
be used to substitute the runtime parameter placeholder.
Returns:
A dict of all runtime parameters to their populated values
"""
if not isinstance(msg, message.Message):
return {}
parameters = {}
# If the message is a pipeline_pb2.Value instance, try to find an substitute
# with runtime parameter bindings.
if isinstance(msg, pipeline_pb2.Value):
value = cast(pipeline_pb2.Value, msg)
which = value.WhichOneof('value')
if which == 'runtime_parameter':
real_value = _get_runtime_parameter_value(value.runtime_parameter,
parameter_bindings)
parameters[value.runtime_parameter.name] = real_value
if real_value is None:
return parameters
value.Clear()
data_types_utils.set_metadata_value(
metadata_value=value.field_value, value=real_value)
if which == 'structural_runtime_parameter':
real_value = _get_structural_runtime_parameter_value(
value.structural_runtime_parameter, parameter_bindings, parameters)
if real_value is None:
return parameters
value.Clear()
data_types_utils.set_metadata_value(
metadata_value=value.field_value, value=real_value)
return parameters
# For other cases, recursively call into sub-messages if any.
for field, sub_message in msg.ListFields():
# No-op for non-message types.
if field.type != descriptor.FieldDescriptor.TYPE_MESSAGE:
continue
# Evaluates every map values in a map.
elif (field.message_type.has_options and
field.message_type.GetOptions().map_entry):
for key in sub_message:
parameters.update(
substitute_runtime_parameter(sub_message[key], parameter_bindings))
# Evaluates every entry in a list.
elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
for element in sub_message:
parameters.update(
substitute_runtime_parameter(element, parameter_bindings))
# Evaluates sub-message.
else:
parameters.update(
substitute_runtime_parameter(sub_message, parameter_bindings))
return parameters