in tfx/orchestration/data_types_utils.py [0:0]
def set_parameter_value(
parameter_value: pipeline_pb2.Value,
value: types.ExecPropertyTypes,
set_schema: Optional[bool] = True) -> pipeline_pb2.Value:
"""Sets field value and schema based on tfx value.
Args:
parameter_value: A pipeline_pb2.Value message to be set.
value: The value of the property.
set_schema: Boolean value indicating whether to set schema in
pipeline_pb2.Value.
Returns:
A pipeline_pb2.Value proto with field_value and optionally schema filled
based on input property.
Raises:
ValueError: If value type is not supported.
"""
def get_value_and_set_type(
value: types.ExecPropertyTypes,
value_type: pipeline_pb2.Value.Schema.ValueType) -> types.Property:
"""Returns serialized value and sets value_type."""
if isinstance(value, bool):
if set_schema:
value_type.boolean_type.SetInParent()
return value
elif isinstance(value, message.Message):
# TODO(b/171794016): Investigate if file descripter set is needed for
# tfx-owned proto already build in the launcher binary.
if set_schema:
proto_type = value_type.proto_type
proto_type.message_type = type(value).DESCRIPTOR.full_name
proto_utils.build_file_descriptor_set(value,
proto_type.file_descriptors)
return proto_utils.proto_to_json(value)
elif isinstance(value, (list, tuple)):
if set_schema:
value_type.list_type.SetInParent()
value = [
get_value_and_set_type(val, value_type.list_type) for val in value
]
return json_utils.dumps(value)
elif isinstance(value, dict):
if set_schema:
value_type.dict_type.SetInParent()
value = {
key: get_value_and_set_type(val, value_type.dict_type)
for key, val in value.items()
}
return json_utils.dumps(value)
elif isinstance(value, (int, float, str)):
return value
else:
raise ValueError('Unexpected type %s' % type(value))
if isinstance(value, int) and not isinstance(value, bool):
parameter_value.field_value.int_value = value
elif isinstance(value, float):
parameter_value.field_value.double_value = value
elif isinstance(value, str):
parameter_value.field_value.string_value = value
elif isinstance(value, pipeline_pb2.Value):
which = value.WhichOneof('value')
if which != 'field_value':
raise ValueError('Expecting field_value but got %s.' % value)
parameter_value.field_value.CopyFrom(value.field_value)
elif isinstance(value, bool):
parameter_value.schema.value_type.boolean_type.SetInParent()
parameter_value.field_value.string_value = json_utils.dumps(value)
elif isinstance(value, (list, tuple, dict, message.Message)):
parameter_value.field_value.string_value = get_value_and_set_type(
value, parameter_value.schema.value_type)
else:
raise ValueError('Unexpected type %s' % type(value))
return parameter_value