private void dispatchLoop()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker.java [393:438]


  private void dispatchLoop() {
    LOG.info("Dispatch starting");
    while (running.get()) {
      memoryMonitor.waitForResources("GetWork");

      int backoff = 1;
      Windmill.GetWorkResponse workResponse;
      do {
        workResponse = getWork();
        if (workResponse.getWorkCount() > 0) {
          break;
        }
        sleep(backoff);
        backoff = Math.min(1000, backoff * 2);
      } while (running.get());
      for (final Windmill.ComputationWorkItems computationWork : workResponse.getWorkList()) {
        final String computation = computationWork.getComputationId();
        if (!instructionMap.containsKey(computation)) {
          getConfig(computation);
        }
        final MapTask mapTask = instructionMap.get(computation);
        if (mapTask == null) {
          LOG.warn(
              "Received work for unknown computation: {}. Known computations are {}",
              computation, instructionMap.keySet());
          continue;
        }

        long watermarkMicros = computationWork.getInputDataWatermark();
        final Instant inputDataWatermark = new Instant(watermarkMicros / 1000);
        ActiveWorkForComputation activeWork = activeWorkMap.get(computation);
        for (final Windmill.WorkItem workItem : computationWork.getWorkList()) {
          Work work = new Work(workItem.getWorkToken()) {
              @Override
              public void run() {
                process(computation, mapTask, inputDataWatermark, workItem);
              }
            };
          if (activeWork.activateWork(workItem.getKey(), work)) {
            workUnitExecutor.execute(work);
          }
        }
      }
    }
    LOG.info("Dispatch done");
  }