in audio/src/klio_audio/decorators.py [0:0]
def handle_binary(*decorator_args, **decorator_kwargs):
"""Decorator to handle the required loading/unloading of binary data.
.. caution::
The ``@handle_binary`` decorator **must** be used in conjunction with
the :func:`@handle_klio <klio.transforms.decorators.handle_klio>`
decorator. As well, ``@handle_binary`` **must** also be applied
**after** ``@handle_klio``.
Example usage:
.. code-block:: python
class MelSpectrogram(beam.DoFn):
@handle_klio
@handle_binary
def process(self, item):
self._klio.logger.info(
f"Generating specgram for {item.element}"
)
audio = item.payload
yield librosa.feature.melspectrogram(y=audio, sr=22050)
@handle_klio
@handle_binary(skip_dump=True)
def save_plt_as_png(ctx, item):
fig = item.payload
output = os.path.join(".", item.element.decode("utf-8") + ".png")
plt.savefig(output, format="png", transparent=True, pad_inches=0)
ctx.logger.info(f"Saved spectrogram: {output}")
return output
class DownloadAudio(beam.DoFn):
def setup(self):
self.client = SomeClient()
@handle_klio
@handle_binary(skip_load=True, save_with_numpy=True)
def process(self, item):
self._klio.logger.info(f"Downloading {item.element}")
filename = item.payload.decode("utf-8")
location = self._klio.config.job_config.data.inputs[0].location
source_path = os.path.join(location, filename)
with self.client.open(source_path, "rb") as source:
out = io.BytesIO(source.read())
self._klio.logger.info(f"Downloaded {item.element} to memory")
yield out
Args:
skip_load (bool): Skip loading the ``KlioMessage`` payload via pickle.
Set this to ``True`` if the incoming ``KlioMessage`` payload is not
binary data, or otherwise has not been pickled to bytes.
Default: ``False``
skip_dump (bool): Skip dumping the ``KlioMessage`` payload via pickle.
Set this to ``True`` if the outgoing ``KlioMessage`` payload is not
binary data, or otherwise should not be pickled to bytes.
Default: ``False``
load_with_numpy (bool): Use :func:`numpy.load` instead of
:func:`pickle.loads` to load arrays or pickled numpy objects. This
is less performant than standard pickling, but uses less memory.
Default: ``False``.
save_with_numpy (bool): Use :func:`numpy.save` instead of
:func:`pickle.dumps` to save arrays as ``bytes``. This is less
performant than standard pickling, but uses less memory.
Default: ``False``
"""
skip_load = decorator_kwargs.pop("skip_load", False)
skip_dump = decorator_kwargs.pop("skip_dump", False)
load_with_numpy = decorator_kwargs.pop("load_with_numpy", False)
save_with_numpy = decorator_kwargs.pop("save_with_numpy", False)
serializer = _BinarySerializer(
skip_load, skip_dump, load_with_numpy, save_with_numpy
)
def inner(func_or_meth):
@functools.wraps(func_or_meth)
def method_wrapper(self, kmsg, *args, **kwargs):
wrapper = __handle_binary
# Only the process method of a DoFn is a generator - otherwise
# beam can't pickle a generator
if __is_dofn_process_method(self, func_or_meth):
wrapper = __handle_binary_generator
return wrapper(
self, func_or_meth, kmsg, serializer, *args, **kwargs
)
@functools.wraps(func_or_meth)
def func_wrapper(ctx, kmsg, *args, **kwargs):
return __handle_binary(
ctx, func_or_meth, kmsg, serializer, *args, **kwargs
)
if __is_method(func_or_meth):
return method_wrapper
return func_wrapper
# allows @handle_binary to be used without parens (i.e. no need to do
# `@handle_binary()`) when there are no args/kwargs provided
if decorator_args:
return inner(decorator_args[0])
return inner