in lib/src/klio/transforms/io.py [0:0]
def _map_row_element(self, row):
# NOTE: this assumes that the coder being used (default is
# beam.io.gcp.bigquery_tools.RowAsDictJsonCoder, otherwise set in
# klio-job.yaml) is JSON serializable (since the default is just
# a plain dictionary). This assumption might break if someone
# provides a different coder.
# NOTE: We need to have the row elements be bytes, so if it is
# a dictionary, we json.dumps into a str to convert to bytes,
# but that may need to change if we want to support other coders
data = {}
if self.__klio_message_columns:
if len(self.__klio_message_columns) == 1:
data = row[self.__klio_message_columns[0]]
else:
for key, value in row.items():
if key in self.__klio_message_columns:
data[key] = value
data = json.dumps(data)
else:
data = json.dumps(row)
return data