in lib/src/klio/transforms/_utils.py [0:0]
def is_original_process_func(clsdict, bases, base_class=None):
"""Only wrap the original `process` function.
Without these (minimal) checks, the `process` function would be
wrapped at least twice (the original `process` function from the
user's DoFn, and our wrapped/decorated one), essentially causing
any call to `process` (and the decorator) to be called at least
twice.
Args:
clsdict (dict): dictionary of items for the class being
instantiated.
bases (tuple(class)): base class(es) of the class being
instantiated.
Returns:
(bool) whether or not to wrap the `process` method of the class
being instantiated.
"""
if "process" not in clsdict:
return False
# ignore classes that don't inherit from our base class
base_cls_names = [b.__name__ for b in bases]
if base_class and base_class not in base_cls_names:
return False
# if the value of clsdict["process"] is not a meth/func
if not callable(clsdict["process"]):
return False
# if the value of clsdict["process"] is already "new_process"
if getattr(clsdict["process"], "__name__") != "process":
return False
return True