in lib/src/klio/transforms/decorators.py [0:0]
def _timeout(seconds=None, exception=None, exception_message=None):
try:
seconds = float(seconds)
except ValueError:
# raise a runtime error so it actually crashes klio/beam rather than
# just continue processing elements
raise RuntimeError(
"Invalid type for timeout 'seconds'. Expected a `float` or an "
"`int`, got `%s`." % type(seconds).__name__
)
if not seconds > 0:
# raise a runtime error so it actually crashes klio/beam rather than
# just continue processing elements
raise RuntimeError(
"Invalid value '%d' for timeout. Must be a positive number."
% seconds
)
def inner(func_or_meth):
with _klio_context() as kctx:
timeout_wrapper = ktimeout.KlioTimeoutWrapper(
function=func_or_meth,
seconds=seconds,
timeout_exception=exception,
exception_message=exception_message,
klio_context=kctx,
)
# Unfortunately these two wrappers can't be abstracted into
# one wrapper - the `self` arg apparently can not be abstracted
@functools.wraps(func_or_meth)
def method_wrapper(self, kmsg, *args, **kwargs):
args = (self, kmsg) + args
return timeout_wrapper(*args, **kwargs)
@functools.wraps(func_or_meth)
def func_wrapper(ctx, kmsg, *args, **kwargs):
args = (ctx, kmsg) + args
return timeout_wrapper(*args, **kwargs)
if __is_method(func_or_meth):
return method_wrapper
return func_wrapper
return inner