in lib/src/klio/transforms/decorators.py [0:0]
def _handle_klio(*args, max_thread_count=None, thread_limiter=None, **kwargs):
def inner(func_or_meth):
func_name = getattr(
func_or_meth, "__qualname__", func_or_meth.__name__
)
thd_limiter = __get_thread_limiter(
max_thread_count, thread_limiter, func_name
)
# grab klio context outside of the method/func wrappers so the
# context manager isn't called for every time an item is processed
with _klio_context() as ctx:
kctx = ctx
metrics_objs = __get_transform_metrics(func_name, kctx)
@functools.wraps(func_or_meth)
def method_wrapper(self, *args, **kwargs):
setattr(self, "_klio", kctx)
# SO. HACKY. We check to see if this method is named "expand"
# to designate if the class is a Composite-type transform
# (rather than a DoFn with a "process" method).
# A Composite transform handles a pcoll / pipeline,
# not the individual elements, and therefore doesn't need
# to be given a KlioMessage. It should only need the KlioContext
# attached.
if func_or_meth.__name__ == "expand":
return func_or_meth(self, *args, **kwargs)
incoming_item = args[0]
args = args[1:]
# Only the process method of a DoFn is a generator - otherwise
# beam can't pickle a generator
if __is_dofn_process_method(self, func_or_meth):
wrapper = __serialize_klio_message_generator
wrapper_kwargs = {
"self": self,
"meth": func_or_meth,
"incoming_item": incoming_item,
"metrics": metrics_objs,
}
result = wrapper(*args, **wrapper_kwargs, **kwargs)
return ThreadLimitGenerator(thd_limiter, result)
else:
wrapper = __serialize_klio_message
wrapper_kwargs = {
"ctx": self,
"func": func_or_meth,
"incoming_item": incoming_item,
"metrics": metrics_objs,
}
with thd_limiter:
return wrapper(*args, **wrapper_kwargs, **kwargs)
@functools.wraps(func_or_meth)
def func_wrapper(incoming_item, *args, **kwargs):
with thd_limiter:
wrapper_kwargs = {
"ctx": kctx,
"func": func_or_meth,
"incoming_item": incoming_item,
"metrics": metrics_objs,
}
return __serialize_klio_message(
*args, **wrapper_kwargs, **kwargs
)
if __is_method(func_or_meth):
return method_wrapper
return func_wrapper
# allows @handle_klio to be used without parens (i.e. no need to do
# `@handle_klio()`) when there are no args/kwargs provided
if args and callable(args[0]):
return inner(args[0])
return inner