in sdks/python/apache_beam/runners/common.py [0:0]
def _invoke_process_per_window(self,
windowed_value, # type: WindowedValue
additional_args,
additional_kwargs,
):
# type: (...) -> Optional[SplitResultResidual]
if self.has_windowed_inputs:
window, = windowed_value.windows
side_inputs = [si[window] for si in self.side_inputs]
side_inputs.extend(additional_args)
args_for_process, kwargs_for_process = util.insert_values_in_args(
self.args_for_process, self.kwargs_for_process,
side_inputs)
elif self.cache_globally_windowed_args:
# Attempt to cache additional args if all inputs are globally
# windowed inputs when processing the first element.
self.cache_globally_windowed_args = False
# Fill in sideInputs if they are globally windowed
global_window = GlobalWindow()
self.args_for_process, self.kwargs_for_process = (
util.insert_values_in_args(
self.args_for_process, self.kwargs_for_process,
[si[global_window] for si in self.side_inputs]))
args_for_process, kwargs_for_process = (
self.args_for_process, self.kwargs_for_process)
else:
args_for_process, kwargs_for_process = (
self.args_for_process, self.kwargs_for_process)
# Extract key in the case of a stateful DoFn. Note that in the case of a
# stateful DoFn, we set during __init__ self.has_windowed_inputs to be
# True. Therefore, windows will be exploded coming into this method, and
# we can rely on the window variable being set above.
if self.user_state_context or self.is_key_param_required:
try:
key, unused_value = windowed_value.value
except (TypeError, ValueError):
raise ValueError((
'Input value to a stateful DoFn or KeyParam must be a KV tuple; '
'instead, got \'%s\'.') % (windowed_value.value, ))
for i, p in self.placeholders:
if core.DoFn.ElementParam == p:
args_for_process[i] = windowed_value.value
elif core.DoFn.KeyParam == p:
args_for_process[i] = key
elif core.DoFn.WindowParam == p:
args_for_process[i] = window
elif core.DoFn.TimestampParam == p:
args_for_process[i] = windowed_value.timestamp
elif core.DoFn.PaneInfoParam == p:
args_for_process[i] = windowed_value.pane_info
elif isinstance(p, core.DoFn.StateParam):
assert self.user_state_context is not None
args_for_process[i] = (
self.user_state_context.get_state(p.state_spec, key, window))
elif isinstance(p, core.DoFn.TimerParam):
assert self.user_state_context is not None
args_for_process[i] = (
self.user_state_context.get_timer(
p.timer_spec,
key,
window,
windowed_value.timestamp,
windowed_value.pane_info))
elif core.DoFn.BundleFinalizerParam == p:
args_for_process[i] = self.bundle_finalizer_param
if additional_kwargs:
if kwargs_for_process is None:
kwargs_for_process = additional_kwargs
else:
for key in additional_kwargs:
kwargs_for_process[key] = additional_kwargs[key]
if kwargs_for_process:
self.output_processor.process_outputs(
windowed_value,
self.process_method(*args_for_process, **kwargs_for_process),
self.threadsafe_watermark_estimator)
else:
self.output_processor.process_outputs(
windowed_value,
self.process_method(*args_for_process),
self.threadsafe_watermark_estimator)
if self.is_splittable:
assert self.threadsafe_restriction_tracker is not None
self.threadsafe_restriction_tracker.check_done()
deferred_status = self.threadsafe_restriction_tracker.deferred_status()
if deferred_status:
deferred_restriction, deferred_timestamp = deferred_status
element = windowed_value.value
size = self.signature.get_restriction_provider().restriction_size(
element, deferred_restriction)
current_watermark = (
self.threadsafe_watermark_estimator.current_watermark())
estimator_state = (
self.threadsafe_watermark_estimator.get_estimator_state())
residual_value = ((element, (deferred_restriction, estimator_state)),
size)
return SplitResultResidual(
residual_value=windowed_value.with_value(residual_value),
current_watermark=current_watermark,
deferred_timestamp=deferred_timestamp)
return None