def process()

in audio/src/klio_audio/transforms/io.py [0:0]


    def process(self, item):
        element = item.element.decode("utf-8")
        input_data_config = self._klio.config.job_config.data.inputs

        # raise a runtime error so it actually crashes klio/beam rather than
        # just continue processing elements
        if len(input_data_config) == 0:
            raise RuntimeError(
                "The `klio_audio.transforms.io.GcsLoadBinary` transform "
                "requires a data input to be configured in "
                "`klio-job.yaml::job_config.data.inputs`."
            )

        # raise a runtime error so it actually crashes klio/beam rather than
        # just continue processing elements
        if len(input_data_config) > 1:
            raise RuntimeError(
                "The `klio_audio.transforms.io.GcsLoadBinary` transform "
                "does not support multiple configured inputs in "
                "`klio-job.yaml::job_config.data.inputs`."
            )
        input_data = input_data_config[0]

        file_suffix = input_data.file_suffix
        if not file_suffix.startswith("."):
            file_suffix = "." + file_suffix
        filename = element + file_suffix
        input_path = os.path.join(input_data.location, filename)

        self._klio.logger.debug(
            "Downloading {} from {}".format(filename, input_data.location)
        )
        with self.client.open(input_path, "rb") as source:
            out = io.BytesIO(source.read())
        self._klio.logger.debug("Downloaded {}".format(filename))
        yield out