def handle_binary()

in audio/src/klio_audio/decorators.py [0:0]


def handle_binary(*decorator_args, **decorator_kwargs):
    """Decorator to handle the required loading/unloading of binary data.

    .. caution::

        The ``@handle_binary`` decorator **must** be used in conjunction with
        the :func:`@handle_klio <klio.transforms.decorators.handle_klio>`
        decorator. As well, ``@handle_binary`` **must** also be applied
        **after** ``@handle_klio``.

    Example usage:

    .. code-block:: python

        class MelSpectrogram(beam.DoFn):
            @handle_klio
            @handle_binary
            def process(self, item):
                self._klio.logger.info(
                    f"Generating specgram for {item.element}"
                )
                audio = item.payload
                yield librosa.feature.melspectrogram(y=audio, sr=22050)


        @handle_klio
        @handle_binary(skip_dump=True)
        def save_plt_as_png(ctx, item):
            fig = item.payload
            output = os.path.join(".", item.element.decode("utf-8") + ".png")
            plt.savefig(output, format="png", transparent=True, pad_inches=0)
            ctx.logger.info(f"Saved spectrogram: {output}")
            return output


        class DownloadAudio(beam.DoFn):
            def setup(self):
                self.client = SomeClient()

            @handle_klio
            @handle_binary(skip_load=True, save_with_numpy=True)
            def process(self, item):
                self._klio.logger.info(f"Downloading {item.element}")
                filename = item.payload.decode("utf-8")
                location = self._klio.config.job_config.data.inputs[0].location
                source_path = os.path.join(location, filename)
                with self.client.open(source_path, "rb") as source:
                    out = io.BytesIO(source.read())
                self._klio.logger.info(f"Downloaded {item.element} to memory")
                yield out

    Args:
        skip_load (bool): Skip loading the ``KlioMessage`` payload via pickle.
            Set this to ``True`` if the incoming ``KlioMessage`` payload is not
            binary data, or otherwise has not been pickled to bytes.
            Default: ``False``
        skip_dump (bool): Skip dumping the ``KlioMessage`` payload via pickle.
            Set this to ``True`` if the outgoing ``KlioMessage`` payload is not
            binary data, or otherwise should not be pickled to bytes.
            Default: ``False``
        load_with_numpy (bool): Use :func:`numpy.load` instead of
            :func:`pickle.loads` to load arrays or pickled numpy objects. This
            is less performant than standard pickling, but uses less memory.
            Default: ``False``.
        save_with_numpy (bool): Use :func:`numpy.save` instead of
            :func:`pickle.dumps` to save arrays as ``bytes``. This is less
            performant than standard pickling, but uses less memory.
            Default: ``False``
    """
    skip_load = decorator_kwargs.pop("skip_load", False)
    skip_dump = decorator_kwargs.pop("skip_dump", False)
    load_with_numpy = decorator_kwargs.pop("load_with_numpy", False)
    save_with_numpy = decorator_kwargs.pop("save_with_numpy", False)

    serializer = _BinarySerializer(
        skip_load, skip_dump, load_with_numpy, save_with_numpy
    )

    def inner(func_or_meth):
        @functools.wraps(func_or_meth)
        def method_wrapper(self, kmsg, *args, **kwargs):
            wrapper = __handle_binary
            # Only the process method of a DoFn is a generator - otherwise
            # beam can't pickle a generator
            if __is_dofn_process_method(self, func_or_meth):
                wrapper = __handle_binary_generator

            return wrapper(
                self, func_or_meth, kmsg, serializer, *args, **kwargs
            )

        @functools.wraps(func_or_meth)
        def func_wrapper(ctx, kmsg, *args, **kwargs):
            return __handle_binary(
                ctx, func_or_meth, kmsg, serializer, *args, **kwargs
            )

        if __is_method(func_or_meth):
            return method_wrapper
        return func_wrapper

    # allows @handle_binary to be used without parens (i.e. no need to do
    # `@handle_binary()`) when there are no args/kwargs provided
    if decorator_args:
        return inner(decorator_args[0])
    return inner