in lib/src/klio/transforms/decorators.py [0:0]
def _serialize_klio_message(func_or_meth):
func_name = getattr(func_or_meth, "__qualname__", func_or_meth.__name__)
metrics_objs = __get_transform_metrics(func_name)
@functools.wraps(func_or_meth)
def method_wrapper(self, incoming_item, *args, **kwargs):
wrapper = __serialize_klio_message
wrapper_kwargs = {
"ctx": self,
"func": func_or_meth,
"incoming_item": incoming_item,
"metrics": metrics_objs,
}
if __is_dofn_process_method(self, func_wrapper):
wrapper = __serialize_klio_message_generator
wrapper_kwargs = {
"self": self,
"meth": func_or_meth,
"incoming_item": incoming_item,
"metrics": metrics_objs,
}
return wrapper(*args, **wrapper_kwargs, **kwargs)
@functools.wraps(func_or_meth)
def func_wrapper(klio_ns, incoming_item, *args, **kwargs):
wrapper_kwargs = {
"ctx": klio_ns,
"func": func_or_meth,
"incoming_item": incoming_item,
"metrics": metrics_objs,
}
return __serialize_klio_message(*args, **wrapper_kwargs, **kwargs)
if __is_method(func_or_meth):
return method_wrapper
return func_wrapper