in audio/src/klio_audio/transforms/io.py [0:0]
def process(self, item):
element = item.element.decode("utf-8")
input_data_config = self._klio.config.job_config.data.inputs
# raise a runtime error so it actually crashes klio/beam rather than
# just continue processing elements
if len(input_data_config) == 0:
raise RuntimeError(
"The `klio_audio.transforms.io.GcsLoadBinary` transform "
"requires a data input to be configured in "
"`klio-job.yaml::job_config.data.inputs`."
)
# raise a runtime error so it actually crashes klio/beam rather than
# just continue processing elements
if len(input_data_config) > 1:
raise RuntimeError(
"The `klio_audio.transforms.io.GcsLoadBinary` transform "
"does not support multiple configured inputs in "
"`klio-job.yaml::job_config.data.inputs`."
)
input_data = input_data_config[0]
file_suffix = input_data.file_suffix
if not file_suffix.startswith("."):
file_suffix = "." + file_suffix
filename = element + file_suffix
input_path = os.path.join(input_data.location, filename)
self._klio.logger.debug(
"Downloading {} from {}".format(filename, input_data.location)
)
with self.client.open(input_path, "rb") as source:
out = io.BytesIO(source.read())
self._klio.logger.debug("Downloaded {}".format(filename))
yield out