def _invoke_process_per_window()

in sdks/python/apache_beam/runners/common.py [0:0]


  def _invoke_process_per_window(self,
                                 windowed_value,  # type: WindowedValue
                                 additional_args,
                                 additional_kwargs,
                                ):
    # type: (...) -> Optional[SplitResultResidual]
    if self.has_windowed_inputs:
      window, = windowed_value.windows
      side_inputs = [si[window] for si in self.side_inputs]
      side_inputs.extend(additional_args)
      args_for_process, kwargs_for_process = util.insert_values_in_args(
          self.args_for_process, self.kwargs_for_process,
          side_inputs)
    elif self.cache_globally_windowed_args:
      # Attempt to cache additional args if all inputs are globally
      # windowed inputs when processing the first element.
      self.cache_globally_windowed_args = False

      # Fill in sideInputs if they are globally windowed
      global_window = GlobalWindow()
      self.args_for_process, self.kwargs_for_process = (
          util.insert_values_in_args(
              self.args_for_process, self.kwargs_for_process,
              [si[global_window] for si in self.side_inputs]))
      args_for_process, kwargs_for_process = (
          self.args_for_process, self.kwargs_for_process)
    else:
      args_for_process, kwargs_for_process = (
          self.args_for_process, self.kwargs_for_process)

    # Extract key in the case of a stateful DoFn. Note that in the case of a
    # stateful DoFn, we set during __init__ self.has_windowed_inputs to be
    # True. Therefore, windows will be exploded coming into this method, and
    # we can rely on the window variable being set above.
    if self.user_state_context or self.is_key_param_required:
      try:
        key, unused_value = windowed_value.value
      except (TypeError, ValueError):
        raise ValueError((
            'Input value to a stateful DoFn or KeyParam must be a KV tuple; '
            'instead, got \'%s\'.') % (windowed_value.value, ))

    for i, p in self.placeholders:
      if core.DoFn.ElementParam == p:
        args_for_process[i] = windowed_value.value
      elif core.DoFn.KeyParam == p:
        args_for_process[i] = key
      elif core.DoFn.WindowParam == p:
        args_for_process[i] = window
      elif core.DoFn.TimestampParam == p:
        args_for_process[i] = windowed_value.timestamp
      elif core.DoFn.PaneInfoParam == p:
        args_for_process[i] = windowed_value.pane_info
      elif isinstance(p, core.DoFn.StateParam):
        assert self.user_state_context is not None
        args_for_process[i] = (
            self.user_state_context.get_state(p.state_spec, key, window))
      elif isinstance(p, core.DoFn.TimerParam):
        assert self.user_state_context is not None
        args_for_process[i] = (
            self.user_state_context.get_timer(
                p.timer_spec,
                key,
                window,
                windowed_value.timestamp,
                windowed_value.pane_info))
      elif core.DoFn.BundleFinalizerParam == p:
        args_for_process[i] = self.bundle_finalizer_param

    if additional_kwargs:
      if kwargs_for_process is None:
        kwargs_for_process = additional_kwargs
      else:
        for key in additional_kwargs:
          kwargs_for_process[key] = additional_kwargs[key]

    if kwargs_for_process:
      self.output_processor.process_outputs(
          windowed_value,
          self.process_method(*args_for_process, **kwargs_for_process),
          self.threadsafe_watermark_estimator)
    else:
      self.output_processor.process_outputs(
          windowed_value,
          self.process_method(*args_for_process),
          self.threadsafe_watermark_estimator)

    if self.is_splittable:
      assert self.threadsafe_restriction_tracker is not None
      self.threadsafe_restriction_tracker.check_done()
      deferred_status = self.threadsafe_restriction_tracker.deferred_status()
      if deferred_status:
        deferred_restriction, deferred_timestamp = deferred_status
        element = windowed_value.value
        size = self.signature.get_restriction_provider().restriction_size(
            element, deferred_restriction)
        current_watermark = (
            self.threadsafe_watermark_estimator.current_watermark())
        estimator_state = (
            self.threadsafe_watermark_estimator.get_estimator_state())
        residual_value = ((element, (deferred_restriction, estimator_state)),
                          size)
        return SplitResultResidual(
            residual_value=windowed_value.with_value(residual_value),
            current_watermark=current_watermark,
            deferred_timestamp=deferred_timestamp)
    return None