def _handle_klio()

in lib/src/klio/transforms/decorators.py [0:0]


def _handle_klio(*args, max_thread_count=None, thread_limiter=None, **kwargs):
    def inner(func_or_meth):
        func_name = getattr(
            func_or_meth, "__qualname__", func_or_meth.__name__
        )
        thd_limiter = __get_thread_limiter(
            max_thread_count, thread_limiter, func_name
        )
        # grab klio context outside of the method/func wrappers so the
        # context manager isn't called for every time an item is processed
        with _klio_context() as ctx:
            kctx = ctx

        metrics_objs = __get_transform_metrics(func_name, kctx)

        @functools.wraps(func_or_meth)
        def method_wrapper(self, *args, **kwargs):
            setattr(self, "_klio", kctx)

            # SO. HACKY. We check to see if this method is named "expand"
            # to designate  if the class is a Composite-type transform
            # (rather than a DoFn with a "process" method).
            # A Composite transform handles a pcoll / pipeline,
            # not the individual elements, and therefore doesn't need
            # to be given a KlioMessage. It should only need the KlioContext
            # attached.
            if func_or_meth.__name__ == "expand":
                return func_or_meth(self, *args, **kwargs)

            incoming_item = args[0]
            args = args[1:]

            # Only the process method of a DoFn is a generator - otherwise
            # beam can't pickle a generator
            if __is_dofn_process_method(self, func_or_meth):
                wrapper = __serialize_klio_message_generator
                wrapper_kwargs = {
                    "self": self,
                    "meth": func_or_meth,
                    "incoming_item": incoming_item,
                    "metrics": metrics_objs,
                }
                result = wrapper(*args, **wrapper_kwargs, **kwargs)
                return ThreadLimitGenerator(thd_limiter, result)
            else:
                wrapper = __serialize_klio_message
                wrapper_kwargs = {
                    "ctx": self,
                    "func": func_or_meth,
                    "incoming_item": incoming_item,
                    "metrics": metrics_objs,
                }
                with thd_limiter:
                    return wrapper(*args, **wrapper_kwargs, **kwargs)

        @functools.wraps(func_or_meth)
        def func_wrapper(incoming_item, *args, **kwargs):
            with thd_limiter:
                wrapper_kwargs = {
                    "ctx": kctx,
                    "func": func_or_meth,
                    "incoming_item": incoming_item,
                    "metrics": metrics_objs,
                }
                return __serialize_klio_message(
                    *args, **wrapper_kwargs, **kwargs
                )

        if __is_method(func_or_meth):
            return method_wrapper
        return func_wrapper

    # allows @handle_klio to be used without parens (i.e. no need to do
    # `@handle_klio()`) when there are no args/kwargs provided
    if args and callable(args[0]):
        return inner(args[0])
    return inner