def _timeout()

in lib/src/klio/transforms/decorators.py [0:0]


def _timeout(seconds=None, exception=None, exception_message=None):
    try:
        seconds = float(seconds)
    except ValueError:
        # raise a runtime error so it actually crashes klio/beam rather than
        # just continue processing elements
        raise RuntimeError(
            "Invalid type for timeout 'seconds'. Expected a `float` or an "
            "`int`, got `%s`." % type(seconds).__name__
        )

    if not seconds > 0:
        # raise a runtime error so it actually crashes klio/beam rather than
        # just continue processing elements
        raise RuntimeError(
            "Invalid value '%d' for timeout. Must be a positive number."
            % seconds
        )

    def inner(func_or_meth):
        with _klio_context() as kctx:
            timeout_wrapper = ktimeout.KlioTimeoutWrapper(
                function=func_or_meth,
                seconds=seconds,
                timeout_exception=exception,
                exception_message=exception_message,
                klio_context=kctx,
            )

        # Unfortunately these two wrappers can't be abstracted into
        # one wrapper - the `self` arg apparently can not be abstracted
        @functools.wraps(func_or_meth)
        def method_wrapper(self, kmsg, *args, **kwargs):
            args = (self, kmsg) + args
            return timeout_wrapper(*args, **kwargs)

        @functools.wraps(func_or_meth)
        def func_wrapper(ctx, kmsg, *args, **kwargs):
            args = (ctx, kmsg) + args
            return timeout_wrapper(*args, **kwargs)

        if __is_method(func_or_meth):
            return method_wrapper
        return func_wrapper

    return inner