in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker.java [393:438]
private void dispatchLoop() {
LOG.info("Dispatch starting");
while (running.get()) {
memoryMonitor.waitForResources("GetWork");
int backoff = 1;
Windmill.GetWorkResponse workResponse;
do {
workResponse = getWork();
if (workResponse.getWorkCount() > 0) {
break;
}
sleep(backoff);
backoff = Math.min(1000, backoff * 2);
} while (running.get());
for (final Windmill.ComputationWorkItems computationWork : workResponse.getWorkList()) {
final String computation = computationWork.getComputationId();
if (!instructionMap.containsKey(computation)) {
getConfig(computation);
}
final MapTask mapTask = instructionMap.get(computation);
if (mapTask == null) {
LOG.warn(
"Received work for unknown computation: {}. Known computations are {}",
computation, instructionMap.keySet());
continue;
}
long watermarkMicros = computationWork.getInputDataWatermark();
final Instant inputDataWatermark = new Instant(watermarkMicros / 1000);
ActiveWorkForComputation activeWork = activeWorkMap.get(computation);
for (final Windmill.WorkItem workItem : computationWork.getWorkList()) {
Work work = new Work(workItem.getWorkToken()) {
@Override
public void run() {
process(computation, mapTask, inputDataWatermark, workItem);
}
};
if (activeWork.activateWork(workItem.getKey(), work)) {
workUnitExecutor.execute(work);
}
}
}
}
LOG.info("Dispatch done");
}