protected void runReadLoop()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ReadOperation.java [164:220]


  protected void runReadLoop() throws Exception {
    Receiver receiver = receivers[0];
    if (receiver == null) {
      // No consumer of this data; don't do anything.
      return;
    }

    try (StateSampler.ScopedState process = stateSampler.scopedState(processState)) {
      assert process != null;
      synchronized (initializationStateLock) {
        readerIterator = reader.iterator();
      }

      // TODO: Consider using the ExecutorService from PipelineOptions instead.
      Thread updateRequester = null;
      if (progressUpdatePeriodMs > 0) {
        updateRequester = new Thread() {
          @Override
          public void run() {
            while (true) {
              isProgressUpdateRequested.set(true);
              try {
                Thread.sleep(progressUpdatePeriodMs);
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        };
        updateRequester.start();
      }

      try {
        // Force a progress update at the beginning and at the end.
        setProgressFromIterator();
        while (true) {
          Object value;
          if (!readerIterator.hasNext()) {
            break;
          }
          value = readerIterator.next();

          if (isProgressUpdateRequested.getAndSet(false) ||
              progressUpdatePeriodMs == UPDATE_ON_EACH_ITERATION) {
            setProgressFromIterator();
          }
          receiver.process(value);
        }
        setProgressFromIterator();
      } finally {
        if (updateRequester != null) {
          updateRequester.interrupt();
          updateRequester.join();
        }
      }
    }
  }