in runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java [181:338]
public void process(ProcessContext c, BoundedWindow w) {
WatermarkEstimatorStateT initialWatermarkEstimatorState =
(WatermarkEstimatorStateT)
invoker.invokeGetInitialWatermarkEstimatorState(
new BaseArgumentProvider<InputT, OutputT>() {
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return c.element().getKey();
}
@Override
public Object restriction() {
return c.element().getValue();
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return c.timestamp();
}
@Override
public PipelineOptions pipelineOptions() {
return c.getPipelineOptions();
}
@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return c.pane();
}
@Override
public BoundedWindow window() {
return w;
}
@Override
public String getErrorContext() {
return NaiveProcessFn.class.getSimpleName()
+ ".invokeGetInitialWatermarkEstimatorState";
}
});
RestrictionT restriction = c.element().getValue();
WatermarkEstimatorStateT watermarkEstimatorState = initialWatermarkEstimatorState;
while (true) {
RestrictionT currentRestriction = restriction;
WatermarkEstimatorStateT currentWatermarkEstimatorState = watermarkEstimatorState;
RestrictionTracker<RestrictionT, PositionT> tracker =
RestrictionTrackers.observe(
invoker.invokeNewTracker(
new BaseArgumentProvider<InputT, OutputT>() {
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return c.element().getKey();
}
@Override
public RestrictionT restriction() {
return currentRestriction;
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return c.timestamp();
}
@Override
public PipelineOptions pipelineOptions() {
return c.getPipelineOptions();
}
@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return c.pane();
}
@Override
public BoundedWindow window() {
return w;
}
@Override
public String getErrorContext() {
return NaiveProcessFn.class.getSimpleName() + ".invokeNewTracker";
}
}),
new ClaimObserver<PositionT>() {
@Override
public void onClaimed(PositionT position) {}
@Override
public void onClaimFailed(PositionT position) {}
});
WatermarkEstimator<WatermarkEstimatorStateT> watermarkEstimator =
invoker.invokeNewWatermarkEstimator(
new BaseArgumentProvider<InputT, OutputT>() {
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return c.element().getKey();
}
@Override
public RestrictionT restriction() {
return currentRestriction;
}
@Override
public WatermarkEstimatorStateT watermarkEstimatorState() {
return currentWatermarkEstimatorState;
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return c.timestamp();
}
@Override
public PipelineOptions pipelineOptions() {
return c.getPipelineOptions();
}
@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return c.pane();
}
@Override
public BoundedWindow window() {
return w;
}
@Override
public String getErrorContext() {
return NaiveProcessFn.class.getSimpleName() + ".invokeNewWatermarkEstimator";
}
});
ProcessContinuation continuation =
invoker.invokeProcessElement(
new NestedProcessContext<>(
fn, c, c.element().getKey(), w, tracker, watermarkEstimator));
if (continuation.shouldResume()) {
// Fetch the watermark before splitting to ensure that the watermark applies to both
// the primary and the residual.
watermarkEstimatorState = watermarkEstimator.getState();
SplitResult<RestrictionT> split = tracker.trySplit(0);
if (split == null) {
break;
}
restriction = split.getResidual();
Uninterruptibles.sleepUninterruptibly(
continuation.resumeDelay().getMillis(), TimeUnit.MILLISECONDS);
} else {
break;
}
}
}