def __init__()

in sdks/python/apache_beam/runners/common.py [0:0]


  def __init__(self,
               output_processor,  # type: _OutputProcessor
               signature,  # type: DoFnSignature
               context,  # type: DoFnContext
               side_inputs,  # type: Iterable[sideinputs.SideInputMap]
               input_args,
               input_kwargs,
               user_state_context,  # type: Optional[userstate.UserStateContext]
               bundle_finalizer_param  # type: Optional[core._BundleFinalizerParam]
              ):
    super(PerWindowInvoker, self).__init__(output_processor, signature)
    self.side_inputs = side_inputs
    self.context = context
    self.process_method = signature.process_method.method_value
    default_arg_values = signature.process_method.defaults
    self.has_windowed_inputs = (
        not all(si.is_globally_windowed() for si in side_inputs) or
        (core.DoFn.WindowParam in default_arg_values) or
        signature.is_stateful_dofn())
    self.user_state_context = user_state_context
    self.is_splittable = signature.is_splittable_dofn()
    self.threadsafe_restriction_tracker = None  # type: Optional[ThreadsafeRestrictionTracker]
    self.threadsafe_watermark_estimator = None  # type: Optional[ThreadsafeWatermarkEstimator]
    self.current_windowed_value = None  # type: Optional[WindowedValue]
    self.bundle_finalizer_param = bundle_finalizer_param
    self.is_key_param_required = False
    if self.is_splittable:
      self.splitting_lock = threading.Lock()
      self.current_window_index = None
      self.stop_window_index = None

    # Try to prepare all the arguments that can just be filled in
    # without any additional work. in the process function.
    # Also cache all the placeholders needed in the process function.

    # Flag to cache additional arguments on the first element if all
    # inputs are within the global window.
    self.cache_globally_windowed_args = not self.has_windowed_inputs

    input_args = input_args if input_args else []
    input_kwargs = input_kwargs if input_kwargs else {}

    arg_names = signature.process_method.args

    # Create placeholder for element parameter of DoFn.process() method.
    # Not to be confused with ArgumentPlaceHolder, which may be passed in
    # input_args and is a placeholder for side-inputs.
    class ArgPlaceholder(object):
      def __init__(self, placeholder):
        self.placeholder = placeholder

    if core.DoFn.ElementParam not in default_arg_values:
      # TODO(BEAM-7867): Handle cases in which len(arg_names) ==
      #   len(default_arg_values).
      args_to_pick = len(arg_names) - len(default_arg_values) - 1
      # Positional argument values for process(), with placeholders for special
      # values such as the element, timestamp, etc.
      args_with_placeholders = ([ArgPlaceholder(core.DoFn.ElementParam)] +
                                input_args[:args_to_pick])
    else:
      args_to_pick = len(arg_names) - len(default_arg_values)
      args_with_placeholders = input_args[:args_to_pick]

    # Fill the OtherPlaceholders for context, key, window or timestamp
    remaining_args_iter = iter(input_args[args_to_pick:])
    for a, d in zip(arg_names[-len(default_arg_values):], default_arg_values):
      if core.DoFn.ElementParam == d:
        args_with_placeholders.append(ArgPlaceholder(d))
      elif core.DoFn.KeyParam == d:
        self.is_key_param_required = True
        args_with_placeholders.append(ArgPlaceholder(d))
      elif core.DoFn.WindowParam == d:
        args_with_placeholders.append(ArgPlaceholder(d))
      elif core.DoFn.TimestampParam == d:
        args_with_placeholders.append(ArgPlaceholder(d))
      elif core.DoFn.PaneInfoParam == d:
        args_with_placeholders.append(ArgPlaceholder(d))
      elif core.DoFn.SideInputParam == d:
        # If no more args are present then the value must be passed via kwarg
        try:
          args_with_placeholders.append(next(remaining_args_iter))
        except StopIteration:
          if a not in input_kwargs:
            raise ValueError("Value for sideinput %s not provided" % a)
      elif isinstance(d, core.DoFn.StateParam):
        args_with_placeholders.append(ArgPlaceholder(d))
      elif isinstance(d, core.DoFn.TimerParam):
        args_with_placeholders.append(ArgPlaceholder(d))
      elif isinstance(d, type) and core.DoFn.BundleFinalizerParam == d:
        args_with_placeholders.append(ArgPlaceholder(d))
      else:
        # If no more args are present then the value must be passed via kwarg
        try:
          args_with_placeholders.append(next(remaining_args_iter))
        except StopIteration:
          pass
    args_with_placeholders.extend(list(remaining_args_iter))

    # Stash the list of placeholder positions for performance
    self.placeholders = [(i, x.placeholder)
                         for (i, x) in enumerate(args_with_placeholders)
                         if isinstance(x, ArgPlaceholder)]

    self.args_for_process = args_with_placeholders
    self.kwargs_for_process = input_kwargs