def _serialize_klio_message()

in lib/src/klio/transforms/decorators.py [0:0]


def _serialize_klio_message(func_or_meth):
    func_name = getattr(func_or_meth, "__qualname__", func_or_meth.__name__)
    metrics_objs = __get_transform_metrics(func_name)

    @functools.wraps(func_or_meth)
    def method_wrapper(self, incoming_item, *args, **kwargs):
        wrapper = __serialize_klio_message
        wrapper_kwargs = {
            "ctx": self,
            "func": func_or_meth,
            "incoming_item": incoming_item,
            "metrics": metrics_objs,
        }
        if __is_dofn_process_method(self, func_wrapper):
            wrapper = __serialize_klio_message_generator
            wrapper_kwargs = {
                "self": self,
                "meth": func_or_meth,
                "incoming_item": incoming_item,
                "metrics": metrics_objs,
            }
        return wrapper(*args, **wrapper_kwargs, **kwargs)

    @functools.wraps(func_or_meth)
    def func_wrapper(klio_ns, incoming_item, *args, **kwargs):
        wrapper_kwargs = {
            "ctx": klio_ns,
            "func": func_or_meth,
            "incoming_item": incoming_item,
            "metrics": metrics_objs,
        }
        return __serialize_klio_message(*args, **wrapper_kwargs, **kwargs)

    if __is_method(func_or_meth):
        return method_wrapper
    return func_wrapper