def _map_row_element()

in lib/src/klio/transforms/io.py [0:0]


    def _map_row_element(self, row):
        # NOTE: this assumes that the coder being used (default is
        # beam.io.gcp.bigquery_tools.RowAsDictJsonCoder, otherwise set in
        # klio-job.yaml) is JSON serializable (since the default is just
        # a plain dictionary). This assumption might break if someone
        # provides a different coder.
        # NOTE: We need to have the row elements be bytes, so if it is
        # a dictionary, we json.dumps into a str to convert to bytes,
        # but that may need to change if we want to support other coders
        data = {}
        if self.__klio_message_columns:
            if len(self.__klio_message_columns) == 1:
                data = row[self.__klio_message_columns[0]]

            else:
                for key, value in row.items():
                    if key in self.__klio_message_columns:
                        data[key] = value
                data = json.dumps(data)

        else:
            data = json.dumps(row)
        return data