in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ReadOperation.java [164:220]
protected void runReadLoop() throws Exception {
Receiver receiver = receivers[0];
if (receiver == null) {
// No consumer of this data; don't do anything.
return;
}
try (StateSampler.ScopedState process = stateSampler.scopedState(processState)) {
assert process != null;
synchronized (initializationStateLock) {
readerIterator = reader.iterator();
}
// TODO: Consider using the ExecutorService from PipelineOptions instead.
Thread updateRequester = null;
if (progressUpdatePeriodMs > 0) {
updateRequester = new Thread() {
@Override
public void run() {
while (true) {
isProgressUpdateRequested.set(true);
try {
Thread.sleep(progressUpdatePeriodMs);
} catch (InterruptedException e) {
break;
}
}
}
};
updateRequester.start();
}
try {
// Force a progress update at the beginning and at the end.
setProgressFromIterator();
while (true) {
Object value;
if (!readerIterator.hasNext()) {
break;
}
value = readerIterator.next();
if (isProgressUpdateRequested.getAndSet(false) ||
progressUpdatePeriodMs == UPDATE_ON_EACH_ITERATION) {
setProgressFromIterator();
}
receiver.process(value);
}
setProgressFromIterator();
} finally {
if (updateRequester != null) {
updateRequester.interrupt();
updateRequester.join();
}
}
}
}