in sdks/python/apache_beam/runners/common.py [0:0]
def __init__(self,
output_processor, # type: _OutputProcessor
signature, # type: DoFnSignature
context, # type: DoFnContext
side_inputs, # type: Iterable[sideinputs.SideInputMap]
input_args,
input_kwargs,
user_state_context, # type: Optional[userstate.UserStateContext]
bundle_finalizer_param # type: Optional[core._BundleFinalizerParam]
):
super(PerWindowInvoker, self).__init__(output_processor, signature)
self.side_inputs = side_inputs
self.context = context
self.process_method = signature.process_method.method_value
default_arg_values = signature.process_method.defaults
self.has_windowed_inputs = (
not all(si.is_globally_windowed() for si in side_inputs) or
(core.DoFn.WindowParam in default_arg_values) or
signature.is_stateful_dofn())
self.user_state_context = user_state_context
self.is_splittable = signature.is_splittable_dofn()
self.threadsafe_restriction_tracker = None # type: Optional[ThreadsafeRestrictionTracker]
self.threadsafe_watermark_estimator = None # type: Optional[ThreadsafeWatermarkEstimator]
self.current_windowed_value = None # type: Optional[WindowedValue]
self.bundle_finalizer_param = bundle_finalizer_param
self.is_key_param_required = False
if self.is_splittable:
self.splitting_lock = threading.Lock()
self.current_window_index = None
self.stop_window_index = None
# Try to prepare all the arguments that can just be filled in
# without any additional work. in the process function.
# Also cache all the placeholders needed in the process function.
# Flag to cache additional arguments on the first element if all
# inputs are within the global window.
self.cache_globally_windowed_args = not self.has_windowed_inputs
input_args = input_args if input_args else []
input_kwargs = input_kwargs if input_kwargs else {}
arg_names = signature.process_method.args
# Create placeholder for element parameter of DoFn.process() method.
# Not to be confused with ArgumentPlaceHolder, which may be passed in
# input_args and is a placeholder for side-inputs.
class ArgPlaceholder(object):
def __init__(self, placeholder):
self.placeholder = placeholder
if core.DoFn.ElementParam not in default_arg_values:
# TODO(BEAM-7867): Handle cases in which len(arg_names) ==
# len(default_arg_values).
args_to_pick = len(arg_names) - len(default_arg_values) - 1
# Positional argument values for process(), with placeholders for special
# values such as the element, timestamp, etc.
args_with_placeholders = ([ArgPlaceholder(core.DoFn.ElementParam)] +
input_args[:args_to_pick])
else:
args_to_pick = len(arg_names) - len(default_arg_values)
args_with_placeholders = input_args[:args_to_pick]
# Fill the OtherPlaceholders for context, key, window or timestamp
remaining_args_iter = iter(input_args[args_to_pick:])
for a, d in zip(arg_names[-len(default_arg_values):], default_arg_values):
if core.DoFn.ElementParam == d:
args_with_placeholders.append(ArgPlaceholder(d))
elif core.DoFn.KeyParam == d:
self.is_key_param_required = True
args_with_placeholders.append(ArgPlaceholder(d))
elif core.DoFn.WindowParam == d:
args_with_placeholders.append(ArgPlaceholder(d))
elif core.DoFn.TimestampParam == d:
args_with_placeholders.append(ArgPlaceholder(d))
elif core.DoFn.PaneInfoParam == d:
args_with_placeholders.append(ArgPlaceholder(d))
elif core.DoFn.SideInputParam == d:
# If no more args are present then the value must be passed via kwarg
try:
args_with_placeholders.append(next(remaining_args_iter))
except StopIteration:
if a not in input_kwargs:
raise ValueError("Value for sideinput %s not provided" % a)
elif isinstance(d, core.DoFn.StateParam):
args_with_placeholders.append(ArgPlaceholder(d))
elif isinstance(d, core.DoFn.TimerParam):
args_with_placeholders.append(ArgPlaceholder(d))
elif isinstance(d, type) and core.DoFn.BundleFinalizerParam == d:
args_with_placeholders.append(ArgPlaceholder(d))
else:
# If no more args are present then the value must be passed via kwarg
try:
args_with_placeholders.append(next(remaining_args_iter))
except StopIteration:
pass
args_with_placeholders.extend(list(remaining_args_iter))
# Stash the list of placeholder positions for performance
self.placeholders = [(i, x.placeholder)
for (i, x) in enumerate(args_with_placeholders)
if isinstance(x, ArgPlaceholder)]
self.args_for_process = args_with_placeholders
self.kwargs_for_process = input_kwargs