in tfx/dsl/compiler/placeholder_utils.py [0:0]
def _resolve_proto_operator(self, op: placeholder_pb2.ProtoOperator) -> Any:
"""Evaluates the proto operator."""
raw_message = self.resolve(op.expression)
if raw_message is None:
raise NullDereferenceError(op.expression)
if isinstance(raw_message, str):
value = proto_utils.deserialize_proto_message(
raw_message,
op.proto_schema.message_type,
file_descriptors=op.proto_schema.file_descriptors)
elif isinstance(raw_message, message.Message):
# Message such as platform config should not be encoded.
value = raw_message
else:
raise ValueError(
f"Got unsupported value type for proto operator: {type(raw_message)}."
)
if op.proto_field_path:
for field in op.proto_field_path:
if field.startswith("."):
try:
value = getattr(value, field[1:])
except AttributeError:
raise ValueError("While evaluting placeholder proto operator, "
f"got unknown proto field {field} on proto of "
f"type {type(value)}.")
continue
map_key = re.findall(r"\[['\"](.+)['\"]\]", field)
if len(map_key) == 1:
try:
value = value[map_key[0]]
except KeyError:
raise ValueError("While evaluting placeholder proto operator, "
f"got unknown map field {field}.")
continue
# Going forward, index access for proto fields should be handled by
# index op. This code here is kept to avoid breaking existing executor
# specs.
index = re.findall(r"\[(\d+)\]", field)
if index and str.isdecimal(index[0]):
try:
value = value[int(index[0])]
except IndexError:
raise ValueError("While evaluting placeholder proto operator, "
f"got unknown index field {field}.")
continue
raise ValueError(f"Got unsupported proto field path: {field}")
# Non-message primitive values are returned directly.
if isinstance(value, (int, float, str, bool, bytes)):
return value
# Return repeated fields as list.
if isinstance(
value,
(_message.RepeatedCompositeContainer, _message.RepeatedScalarContainer,
containers.RepeatedCompositeFieldContainer,
containers.RepeatedScalarFieldContainer)):
return list(value)
if not isinstance(value, message.Message):
raise ValueError(f"Got unsupported value type {type(value)} "
"from accessing proto field path.")
# For message-typed values, we need to consider serialization format.
if op.serialization_format:
if op.serialization_format == placeholder_pb2.ProtoOperator.JSON:
return json_format.MessageToJson(
message=value, sort_keys=True, preserving_proto_field_name=True)
if op.serialization_format == placeholder_pb2.ProtoOperator.TEXT_FORMAT:
return text_format.MessageToString(value)
if (op.serialization_format ==
placeholder_pb2.ProtoOperator.INLINE_FILE_TEXT_FORMAT):
return fileio.get_inline_filename(
text_format.MessageToString(value, as_one_line=True))
if op.serialization_format == placeholder_pb2.ProtoOperator.BINARY:
return value.SerializeToString()
raise ValueError(
"Proto operator resolves to a proto message value. A serialization "
"format is needed to render it.")