in sdks/python/apache_beam/runners/common.py [0:0]
def _try_split(fraction,
window_index, # type: Optional[int]
stop_window_index, # type: Optional[int]
windowed_value, # type: WindowedValue
restriction,
watermark_estimator_state,
restriction_provider, # type: RestrictionProvider
restriction_tracker, # type: RestrictionTracker
watermark_estimator, # type: WatermarkEstimator
):
# type: (...) -> Optional[Tuple[Iterable[SplitResultPrimary], Iterable[SplitResultResidual], Optional[int]]]
"""Try to split returning a primaries, residuals and a new stop index.
For non-window observing splittable DoFns we split the current restriction
and assign the primary and residual to all the windows.
For window observing splittable DoFns, we:
1) return a split at a window boundary if the fraction lies outside of the
current window.
2) attempt to split the current restriction, if successful then return
the primary and residual for the current window and an additional
primary and residual for any fully processed and fully unprocessed
windows.
3) fall back to returning a split at the window boundary if possible
Args:
window_index: the current index of the window being processed or None
if the splittable DoFn is not window observing.
stop_window_index: the current index to stop processing at or None
if the splittable DoFn is not window observing.
windowed_value: the current windowed value
restriction: the initial restriction when processing was started.
watermark_estimator_state: the initial watermark estimator state when
processing was started.
restriction_provider: the DoFn's restriction provider
restriction_tracker: the current restriction tracker
watermark_estimator: the current watermark estimator
Returns:
A tuple containing (primaries, residuals, new_stop_index) or None if
splitting was not possible. new_stop_index will only be set if the
splittable DoFn is window observing otherwise it will be None.
"""
def compute_whole_window_split(to_index, from_index):
restriction_size = restriction_provider.restriction_size(
windowed_value, restriction)
# The primary and residual both share the same value only differing
# by the set of windows they are in.
value = ((windowed_value.value, (restriction, watermark_estimator_state)),
restriction_size)
primary_restriction = SplitResultPrimary(
primary_value=WindowedValue(
value,
windowed_value.timestamp,
windowed_value.windows[:to_index])) if to_index > 0 else None
# Don't report any updated watermarks for the residual since they have
# not processed any part of the restriction.
residual_restriction = SplitResultResidual(
residual_value=WindowedValue(
value,
windowed_value.timestamp,
windowed_value.windows[from_index:stop_window_index]),
current_watermark=None,
deferred_timestamp=None) if from_index < stop_window_index else None
return (primary_restriction, residual_restriction)
primary_restrictions = []
residual_restrictions = []
window_observing = window_index is not None
# If we are processing each window separately and we aren't on the last
# window then compute whether the split lies within the current window
# or a future window.
if window_observing and window_index != stop_window_index - 1:
progress = restriction_tracker.current_progress()
if not progress:
# Assume no work has been completed for the current window if progress
# is unavailable.
from apache_beam.io.iobase import RestrictionProgress
progress = RestrictionProgress(completed=0, remaining=1)
scaled_progress = PerWindowInvoker._scale_progress(
progress, window_index, stop_window_index)
# Compute the fraction of the remainder relative to the scaled progress.
# If the value is greater than or equal to progress.remaining_work then we
# should split at the closest window boundary.
fraction_of_remainder = scaled_progress.remaining_work * fraction
if fraction_of_remainder >= progress.remaining_work:
# The fraction is outside of the current window and hence we will
# split at the closest window boundary. Favor a split and return the
# last window if we would have rounded up to the end of the window
# based upon the fraction.
new_stop_window_index = min(
stop_window_index - 1,
window_index + max(
1,
int(
round((
progress.completed_work +
scaled_progress.remaining_work * fraction) /
progress.total_work))))
primary, residual = compute_whole_window_split(
new_stop_window_index, new_stop_window_index)
assert primary is not None
assert residual is not None
return ([primary], [residual], new_stop_window_index)
else:
# The fraction is within the current window being processed so compute
# the updated fraction based upon the number of windows being processed.
new_stop_window_index = window_index + 1
fraction = fraction_of_remainder / progress.remaining_work
# Attempt to split below, if we can't then we'll compute a split
# using only window boundaries
else:
# We aren't splitting within multiple windows so we don't change our
# stop index.
new_stop_window_index = stop_window_index
# Temporary workaround for [BEAM-7473]: get current_watermark before
# split, in case watermark gets advanced before getting split results.
# In worst case, current_watermark is always stale, which is ok.
current_watermark = (watermark_estimator.current_watermark())
current_estimator_state = (watermark_estimator.get_estimator_state())
split = restriction_tracker.try_split(fraction)
if split:
primary, residual = split
element = windowed_value.value
primary_size = restriction_provider.restriction_size(
windowed_value.value, primary)
residual_size = restriction_provider.restriction_size(
windowed_value.value, residual)
# We use the watermark estimator state for the original process call
# for the primary and the updated watermark estimator state for the
# residual for the split.
primary_split_value = ((element, (primary, watermark_estimator_state)),
primary_size)
residual_split_value = ((element, (residual, current_estimator_state)),
residual_size)
windows = (
windowed_value.windows[window_index],
) if window_observing else windowed_value.windows
primary_restrictions.append(
SplitResultPrimary(
primary_value=WindowedValue(
primary_split_value, windowed_value.timestamp, windows)))
residual_restrictions.append(
SplitResultResidual(
residual_value=WindowedValue(
residual_split_value, windowed_value.timestamp, windows),
current_watermark=current_watermark,
deferred_timestamp=None))
if window_observing:
assert new_stop_window_index == window_index + 1
primary, residual = compute_whole_window_split(
window_index, window_index + 1)
if primary:
primary_restrictions.append(primary)
if residual:
residual_restrictions.append(residual)
return (
primary_restrictions, residual_restrictions, new_stop_window_index)
elif new_stop_window_index and new_stop_window_index != stop_window_index:
# If we failed to split but have a new stop index then return a split
# at the window boundary.
primary, residual = compute_whole_window_split(
new_stop_window_index, new_stop_window_index)
assert primary is not None
assert residual is not None
return ([primary], [residual], new_stop_window_index)
else:
return None