def _try_split()

in sdks/python/apache_beam/runners/common.py [0:0]


  def _try_split(fraction,
      window_index, # type: Optional[int]
      stop_window_index, # type: Optional[int]
      windowed_value, # type: WindowedValue
      restriction,
      watermark_estimator_state,
      restriction_provider, # type: RestrictionProvider
      restriction_tracker, # type: RestrictionTracker
      watermark_estimator, # type: WatermarkEstimator
                 ):
    # type: (...) -> Optional[Tuple[Iterable[SplitResultPrimary], Iterable[SplitResultResidual], Optional[int]]]

    """Try to split returning a primaries, residuals and a new stop index.

    For non-window observing splittable DoFns we split the current restriction
    and assign the primary and residual to all the windows.

    For window observing splittable DoFns, we:
    1) return a split at a window boundary if the fraction lies outside of the
       current window.
    2) attempt to split the current restriction, if successful then return
       the primary and residual for the current window and an additional
       primary and residual for any fully processed and fully unprocessed
       windows.
    3) fall back to returning a split at the window boundary if possible

    Args:
      window_index: the current index of the window being processed or None
                    if the splittable DoFn is not window observing.
      stop_window_index: the current index to stop processing at or None
                         if the splittable DoFn is not window observing.
      windowed_value: the current windowed value
      restriction: the initial restriction when processing was started.
      watermark_estimator_state: the initial watermark estimator state when
                                 processing was started.
      restriction_provider: the DoFn's restriction provider
      restriction_tracker: the current restriction tracker
      watermark_estimator: the current watermark estimator

    Returns:
      A tuple containing (primaries, residuals, new_stop_index) or None if
      splitting was not possible. new_stop_index will only be set if the
      splittable DoFn is window observing otherwise it will be None.
    """
    def compute_whole_window_split(to_index, from_index):
      restriction_size = restriction_provider.restriction_size(
          windowed_value, restriction)
      # The primary and residual both share the same value only differing
      # by the set of windows they are in.
      value = ((windowed_value.value, (restriction, watermark_estimator_state)),
               restriction_size)
      primary_restriction = SplitResultPrimary(
          primary_value=WindowedValue(
              value,
              windowed_value.timestamp,
              windowed_value.windows[:to_index])) if to_index > 0 else None
      # Don't report any updated watermarks for the residual since they have
      # not processed any part of the restriction.
      residual_restriction = SplitResultResidual(
          residual_value=WindowedValue(
              value,
              windowed_value.timestamp,
              windowed_value.windows[from_index:stop_window_index]),
          current_watermark=None,
          deferred_timestamp=None) if from_index < stop_window_index else None
      return (primary_restriction, residual_restriction)

    primary_restrictions = []
    residual_restrictions = []

    window_observing = window_index is not None
    # If we are processing each window separately and we aren't on the last
    # window then compute whether the split lies within the current window
    # or a future window.
    if window_observing and window_index != stop_window_index - 1:
      progress = restriction_tracker.current_progress()
      if not progress:
        # Assume no work has been completed for the current window if progress
        # is unavailable.
        from apache_beam.io.iobase import RestrictionProgress
        progress = RestrictionProgress(completed=0, remaining=1)

      scaled_progress = PerWindowInvoker._scale_progress(
          progress, window_index, stop_window_index)
      # Compute the fraction of the remainder relative to the scaled progress.
      # If the value is greater than or equal to progress.remaining_work then we
      # should split at the closest window boundary.
      fraction_of_remainder = scaled_progress.remaining_work * fraction
      if fraction_of_remainder >= progress.remaining_work:
        # The fraction is outside of the current window and hence we will
        # split at the closest window boundary. Favor a split and return the
        # last window if we would have rounded up to the end of the window
        # based upon the fraction.
        new_stop_window_index = min(
            stop_window_index - 1,
            window_index + max(
                1,
                int(
                    round((
                        progress.completed_work +
                        scaled_progress.remaining_work * fraction) /
                          progress.total_work))))
        primary, residual = compute_whole_window_split(
            new_stop_window_index, new_stop_window_index)
        assert primary is not None
        assert residual is not None
        return ([primary], [residual], new_stop_window_index)
      else:
        # The fraction is within the current window being processed so compute
        # the updated fraction based upon the number of windows being processed.
        new_stop_window_index = window_index + 1
        fraction = fraction_of_remainder / progress.remaining_work
        # Attempt to split below, if we can't then we'll compute a split
        # using only window boundaries
    else:
      # We aren't splitting within multiple windows so we don't change our
      # stop index.
      new_stop_window_index = stop_window_index

    # Temporary workaround for [BEAM-7473]: get current_watermark before
    # split, in case watermark gets advanced before getting split results.
    # In worst case, current_watermark is always stale, which is ok.
    current_watermark = (watermark_estimator.current_watermark())
    current_estimator_state = (watermark_estimator.get_estimator_state())
    split = restriction_tracker.try_split(fraction)
    if split:
      primary, residual = split
      element = windowed_value.value
      primary_size = restriction_provider.restriction_size(
          windowed_value.value, primary)
      residual_size = restriction_provider.restriction_size(
          windowed_value.value, residual)
      # We use the watermark estimator state for the original process call
      # for the primary and the updated watermark estimator state for the
      # residual for the split.
      primary_split_value = ((element, (primary, watermark_estimator_state)),
                             primary_size)
      residual_split_value = ((element, (residual, current_estimator_state)),
                              residual_size)
      windows = (
          windowed_value.windows[window_index],
      ) if window_observing else windowed_value.windows
      primary_restrictions.append(
          SplitResultPrimary(
              primary_value=WindowedValue(
                  primary_split_value, windowed_value.timestamp, windows)))
      residual_restrictions.append(
          SplitResultResidual(
              residual_value=WindowedValue(
                  residual_split_value, windowed_value.timestamp, windows),
              current_watermark=current_watermark,
              deferred_timestamp=None))

      if window_observing:
        assert new_stop_window_index == window_index + 1
        primary, residual = compute_whole_window_split(
            window_index, window_index + 1)
        if primary:
          primary_restrictions.append(primary)
        if residual:
          residual_restrictions.append(residual)
      return (
          primary_restrictions, residual_restrictions, new_stop_window_index)
    elif new_stop_window_index and new_stop_window_index != stop_window_index:
      # If we failed to split but have a new stop index then return a split
      # at the window boundary.
      primary, residual = compute_whole_window_split(
          new_stop_window_index, new_stop_window_index)
      assert primary is not None
      assert residual is not None
      return ([primary], [residual], new_stop_window_index)
    else:
      return None