in audio/src/klio_audio/transforms/io.py [0:0]
def process(self, item):
element = item.element.decode("utf-8")
output_data_config = self._klio.config.job_config.data.outputs
# raise a runtime error so it actually crashes klio/beam rather than
# just continue processing elements
if len(output_data_config) == 0:
raise RuntimeError(
"The `klio_audio.transforms.io.GcsUploadPlot` transform "
"requires a data output to be configured in "
"`klio-job.yaml::job_config.data.outputs`."
)
# raise a runtime error so it actually crashes klio/beam rather than
# just continue processing elements
if len(output_data_config) > 1:
raise RuntimeError(
"The `klio_audio.transforms.io.GcsUploadPlot` transform "
"does not support multiple configured outputs in "
"`klio-job.yaml::job_config.data.outputs`."
)
output_data = output_data_config[0]
file_suffix = output_data.file_suffix
if not file_suffix.startswith("."):
file_suffix = "." + file_suffix
filename = self.prefix + element + self.suffix + file_suffix
output_path = os.path.join(output_data.location, filename)
source = io.BytesIO()
fig = item.payload
fig_format = self.file_format or file_suffix.lstrip(".")
self._klio.logger.debug(
"Saving plot as {} for {}".format(fig_format, element)
)
fig.savefig(source, format=fig_format, **self.plt_kwargs)
with self.client.open(output_path, "wb") as out:
out.write(source.getvalue())
self._klio.logger.debug("Saved plot to {}".format(output_path))
yield output_path