in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingModeExecutionContext.java [206:268]
public Map<Long, Runnable> flushState() {
Map<Long, Runnable> callbacks = new HashMap<>();
for (ExecutionContext.StepContext stepContext : getAllStepContexts()) {
((StepContext) stepContext).flushState();
}
if (activeReader != null) {
Windmill.SourceState.Builder sourceStateBuilder =
outputBuilder.getSourceStateUpdatesBuilder();
final UnboundedSource.CheckpointMark checkpointMark = activeReader.getCheckpointMark();
final Instant watermark = activeReader.getWatermark();
long id = ThreadLocalRandom.current().nextLong();
sourceStateBuilder.addFinalizeIds(id);
callbacks.put(
id,
new Runnable() {
@Override
public void run() {
try {
checkpointMark.finalizeCheckpoint();
} catch (IOException e) {
throw new RuntimeException("Exception while finalizing checkpoint", e);
}
}
});
Coder<UnboundedSource.CheckpointMark> checkpointCoder =
((UnboundedSource<?, UnboundedSource.CheckpointMark>) activeReader.getCurrentSource())
.getCheckpointMarkCoder();
if (checkpointCoder != null) {
ByteString.Output stream = ByteString.newOutput();
try {
checkpointCoder.encode(checkpointMark, stream, Coder.Context.OUTER);
} catch (IOException e) {
throw new RuntimeException("Exception while encoding checkpoint", e);
}
sourceStateBuilder.setState(stream.toByteString());
}
outputBuilder.setSourceWatermark(TimeUnit.MILLISECONDS.toMicros(watermark.getMillis()));
long backlogBytes = activeReader.getSplitBacklogBytes();
if (backlogBytes == UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN
&& CustomSources.isFirstUnboundedSourceSplit(getSerializedKey())) {
// Only call getTotalBacklogBytes() on the first split.
backlogBytes = activeReader.getTotalBacklogBytes();
}
if (backlogBytes != UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN) {
outputBuilder.addCounterUpdates(
Windmill.Counter.newBuilder()
.setName("dataflow_backlog_size-" + stageName)
.setKind(Windmill.Counter.Kind.SUM)
.setIntScalar(backlogBytes)
.setCumulative(true)
.build());
}
readerCache.put(
getSerializedKey(), new ReaderCacheEntry(activeReader, getWork().getCacheToken()));
}
return callbacks;
}