in runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java [350:549]
public void processElement(final ProcessContext c, BoundedWindow boundedWindow) {
byte[] key = c.element().key();
StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);
// Initialize state (element and restriction) depending on whether this is the seed call.
// The seed call is the first call for this element, which actually has the element.
// Subsequent calls are timer firings and the element has to be retrieved from the state.
TimerInternals.TimerData timer = Iterables.getOnlyElement(c.element().timersIterable(), null);
boolean isSeedCall = (timer == null);
StateNamespace stateNamespace;
if (isSeedCall) {
WindowedValue<KV<InputT, RestrictionT>> windowedValue =
Iterables.getOnlyElement(c.element().elementsIterable());
BoundedWindow window = Iterables.getOnlyElement(windowedValue.getWindows());
stateNamespace =
StateNamespaces.window(
(Coder<BoundedWindow>) inputWindowingStrategy.getWindowFn().windowCoder(), window);
} else {
stateNamespace = timer.getNamespace();
}
ValueState<WindowedValue<InputT>> elementState =
stateInternals.state(stateNamespace, elementTag);
ValueState<RestrictionT> restrictionState =
stateInternals.state(stateNamespace, restrictionTag);
ValueState<WatermarkEstimatorStateT> watermarkEstimatorState =
stateInternals.state(stateNamespace, watermarkEstimatorStateTag);
WatermarkHoldState holdState = stateInternals.state(stateNamespace, watermarkHoldTag);
KV<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
WatermarkEstimatorStateT watermarkEstimatorStateT;
if (isSeedCall) {
WindowedValue<KV<InputT, RestrictionT>> windowedValue =
Iterables.getOnlyElement(c.element().elementsIterable());
WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().getKey());
elementState.write(element);
elementAndRestriction = KV.of(element, windowedValue.getValue().getValue());
watermarkEstimatorStateT =
invoker.invokeGetInitialWatermarkEstimatorState(
new BaseArgumentProvider<InputT, OutputT>() {
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return elementAndRestriction.getKey().getValue();
}
@Override
public Object restriction() {
return elementAndRestriction.getValue();
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return c.timestamp();
}
@Override
public PipelineOptions pipelineOptions() {
return c.getPipelineOptions();
}
@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return c.pane();
}
@Override
public BoundedWindow window() {
return boundedWindow;
}
@Override
public String getErrorContext() {
return ProcessFn.class.getSimpleName()
+ ".invokeGetInitialWatermarkEstimatorState";
}
});
} else {
// This is not the first ProcessElement call for this element/restriction - rather,
// this is a timer firing, so we need to fetch the element and restriction from state.
elementState.readLater();
restrictionState.readLater();
watermarkEstimatorState.readLater();
elementAndRestriction = KV.of(elementState.read(), restrictionState.read());
watermarkEstimatorStateT = watermarkEstimatorState.read();
}
final WatermarkEstimator<WatermarkEstimatorStateT> watermarkEstimator =
invoker.invokeNewWatermarkEstimator(
new BaseArgumentProvider<InputT, OutputT>() {
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return elementAndRestriction.getKey().getValue();
}
@Override
public Object restriction() {
return elementAndRestriction.getValue();
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return c.timestamp();
}
@Override
public PipelineOptions pipelineOptions() {
return c.getPipelineOptions();
}
@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return c.pane();
}
@Override
public BoundedWindow window() {
return boundedWindow;
}
@Override
public Object watermarkEstimatorState() {
return watermarkEstimatorStateT;
}
@Override
public String getErrorContext() {
return ProcessFn.class.getSimpleName() + ".invokeNewWatermarkEstimator";
}
});
final RestrictionTracker<RestrictionT, PositionT> tracker =
invoker.invokeNewTracker(
new BaseArgumentProvider<InputT, OutputT>() {
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return elementAndRestriction.getKey().getValue();
}
@Override
public Object restriction() {
return elementAndRestriction.getValue();
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return c.timestamp();
}
@Override
public PipelineOptions pipelineOptions() {
return c.getPipelineOptions();
}
@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return c.pane();
}
@Override
public BoundedWindow window() {
return boundedWindow;
}
@Override
public String getErrorContext() {
return ProcessFn.class.getSimpleName() + ".invokeNewTracker";
}
});
SplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
.Result
result =
processElementInvoker.invokeProcessElement(
invoker, elementAndRestriction.getKey(), tracker, watermarkEstimator);
// Save state for resuming.
if (result.getResidualRestriction() == null) {
// All work for this element/restriction is completed. Clear state and release hold.
elementState.clear();
restrictionState.clear();
watermarkEstimatorState.clear();
holdState.clear();
return;
}
restrictionState.write(result.getResidualRestriction());
watermarkEstimatorState.write(result.getFutureWatermarkEstimatorState());
@Nullable Instant futureOutputWatermark = result.getFutureOutputWatermark();
if (futureOutputWatermark == null) {
futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
}
Instant wakeupTime =
timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
holdState.add(futureOutputWatermark);
// Set a timer to continue processing this element.
timerInternals.setTimer(
TimerInternals.TimerData.of(
stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME));
}