public Map flushState()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingModeExecutionContext.java [206:268]


  public Map<Long, Runnable> flushState() {
    Map<Long, Runnable> callbacks = new HashMap<>();

    for (ExecutionContext.StepContext stepContext : getAllStepContexts()) {
      ((StepContext) stepContext).flushState();
    }


    if (activeReader != null) {
      Windmill.SourceState.Builder sourceStateBuilder =
          outputBuilder.getSourceStateUpdatesBuilder();
      final UnboundedSource.CheckpointMark checkpointMark = activeReader.getCheckpointMark();
      final Instant watermark = activeReader.getWatermark();
      long id = ThreadLocalRandom.current().nextLong();
      sourceStateBuilder.addFinalizeIds(id);
      callbacks.put(
          id,
          new Runnable() {
            @Override
            public void run() {
              try {
                checkpointMark.finalizeCheckpoint();
              } catch (IOException e) {
                throw new RuntimeException("Exception while finalizing checkpoint", e);
              }
            }
          });

      Coder<UnboundedSource.CheckpointMark> checkpointCoder =
          ((UnboundedSource<?, UnboundedSource.CheckpointMark>) activeReader.getCurrentSource())
              .getCheckpointMarkCoder();
      if (checkpointCoder != null) {
        ByteString.Output stream = ByteString.newOutput();
        try {
          checkpointCoder.encode(checkpointMark, stream, Coder.Context.OUTER);
        } catch (IOException e) {
          throw new RuntimeException("Exception while encoding checkpoint", e);
        }
        sourceStateBuilder.setState(stream.toByteString());
      }
      outputBuilder.setSourceWatermark(TimeUnit.MILLISECONDS.toMicros(watermark.getMillis()));

      long backlogBytes = activeReader.getSplitBacklogBytes();
      if (backlogBytes == UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN
          && CustomSources.isFirstUnboundedSourceSplit(getSerializedKey())) {
        // Only call getTotalBacklogBytes() on the first split.
        backlogBytes = activeReader.getTotalBacklogBytes();
      }
      if (backlogBytes != UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN) {
        outputBuilder.addCounterUpdates(
            Windmill.Counter.newBuilder()
            .setName("dataflow_backlog_size-" + stageName)
            .setKind(Windmill.Counter.Kind.SUM)
            .setIntScalar(backlogBytes)
            .setCumulative(true)
            .build());
      }

      readerCache.put(
          getSerializedKey(), new ReaderCacheEntry(activeReader, getWork().getCacheToken()));
    }
    return callbacks;
  }