def process()

in audio/src/klio_audio/transforms/io.py [0:0]


    def process(self, item):
        element = item.element.decode("utf-8")
        output_data_config = self._klio.config.job_config.data.outputs

        # raise a runtime error so it actually crashes klio/beam rather than
        # just continue processing elements
        if len(output_data_config) == 0:
            raise RuntimeError(
                "The `klio_audio.transforms.io.GcsUploadPlot` transform "
                "requires a data output to be configured in "
                "`klio-job.yaml::job_config.data.outputs`."
            )

        # raise a runtime error so it actually crashes klio/beam rather than
        # just continue processing elements
        if len(output_data_config) > 1:
            raise RuntimeError(
                "The `klio_audio.transforms.io.GcsUploadPlot` transform "
                "does not support multiple configured outputs in "
                "`klio-job.yaml::job_config.data.outputs`."
            )

        output_data = output_data_config[0]

        file_suffix = output_data.file_suffix
        if not file_suffix.startswith("."):
            file_suffix = "." + file_suffix
        filename = self.prefix + element + self.suffix + file_suffix
        output_path = os.path.join(output_data.location, filename)

        source = io.BytesIO()
        fig = item.payload
        fig_format = self.file_format or file_suffix.lstrip(".")
        self._klio.logger.debug(
            "Saving plot as {} for {}".format(fig_format, element)
        )
        fig.savefig(source, format=fig_format, **self.plt_kwargs)

        with self.client.open(output_path, "wb") as out:
            out.write(source.getvalue())

        self._klio.logger.debug("Saved plot to {}".format(output_path))
        yield output_path