in lib/src/klio/transforms/decorators.py [0:0]
def __get_thread_limiter(max_thread_count, thread_limiter, func_name=None):
if max_thread_count is not None and thread_limiter is not None:
# raise a runtime error so it actually crashes klio/beam rather than
# just continue processing elements
raise RuntimeError(
"`max_thread_count` and `thread_limiter` are mutually exclusive "
"arguments."
)
if thread_limiter is not None:
if not isinstance(thread_limiter, kutils.ThreadLimiter):
# raise a runtime error so it actually crashes klio/beam rather
# than just continue processing elements
raise RuntimeError(
"'thread_limiter' must be an instance of `klio.utils."
"ThreadLimiter`."
)
if max_thread_count is not None:
is_int_enum = isinstance(max_thread_count, (int, kutils.ThreadLimit))
is_func = callable(max_thread_count)
if not any([is_int_enum, is_func]):
# raise a runtime error so it actually crashes klio/beam rather
# than just continue processing elements
raise RuntimeError(
"Invalid type for handle_klio's argument 'max_thread_count'. "
"Expected an `int`, a callable returning an `int`, or "
"`klio.utils.ThreadLimit`, got `%s`."
% type(max_thread_count).__name__
)
if isinstance(max_thread_count, int) and max_thread_count <= 0:
# raise a runtime error so it actually crashes klio/beam rather
# than just continue processing elements
raise RuntimeError(
"'max_thread_count' must be greater than 0. Set "
"'max_thread_count' to `None` or `klio.utils.ThreadLimiter."
"NONE` to turn off thread limitations."
)
if max_thread_count is None and thread_limiter is None:
max_thread_count = kutils.ThreadLimit.DEFAULT
if thread_limiter is None:
thread_limiter = kutils.ThreadLimiter(
max_thread_count=max_thread_count, name=func_name
)
return thread_limiter