private void process()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker.java [450:628]


  private void process(
      final String computation,
      final MapTask mapTask,
      final Instant inputDataWatermark,
      final Windmill.WorkItem work) {
    LOG.debug("Starting processing for {}:\n{}", computation, work);

    Windmill.WorkItemCommitRequest.Builder outputBuilder =
        Windmill.WorkItemCommitRequest.newBuilder()
        .setKey(work.getKey())
        .setWorkToken(work.getWorkToken());

    StreamingModeExecutionContext context = null;
    MapTaskExecutor worker = null;

    try {
      DataflowWorkerLoggingMDC.setWorkId(
          work.getKey().toStringUtf8() + "-" + Long.toString(work.getWorkToken()));
      DataflowWorkerLoggingMDC.setStageName(computation);
      WorkerAndContext workerAndContext = mapTaskExecutors.get(computation).poll();
      if (workerAndContext == null) {
        CounterSet counters = new CounterSet();
        context = new StreamingModeExecutionContext(
            mapTask.getSystemName(), readerCache.get(computation), stateNameMap);
        StateSampler sampler =
            new StateSampler(mapTask.getStageName() + "-", counters.getAddCounterMutator());
        // In streaming mode, state samplers are long lived. So here a unique id is generated as
        // the item_id for the userCodeTimeTracker.
        int stateSamplerId = nextStateSamplerId.incrementAndGet();
        sampler.addSamplingCallback(
            new UserCodeTimeTracker.StateSamplerCallback(
                userCodeTimeTracker, stateSamplerId));
        // "work" will never finish here.
        userCodeTimeTracker.workStarted(
            sampler.getPrefix(), stateSamplerId, counters.getAddCounterMutator());
        worker = MapTaskExecutorFactory.create(options, mapTask, context, counters, sampler);
        ReadOperation readOperation = worker.getReadOperation();
        // Disable progress updates since its results are unused for streaming
        // and involves starting a thread.
        readOperation.setProgressUpdatePeriodMs(ReadOperation.DONT_UPDATE_PERIODICALLY);
        Preconditions.checkState(
            worker.supportsRestart(), "Streaming runner requires all operations support restart.");

        // If using a custom source, count bytes read for autoscaling.
        ParallelInstruction read = mapTask.getInstructions().get(0);
        if (CustomSources.class.getName().equals(
                read.getRead().getSource().getSpec().get("@type"))) {
          readOperation.receivers[0].addOutputCounter(
              new OutputObjectAndByteCounter(
                  new MapTaskExecutorFactory.ElementByteSizeObservableCoder<>(
                      Serializer.deserialize(read.getOutputs().get(0).getCodec(), Coder.class)),
                  worker.getOutputCounters().getAddCounterMutator())
                  .setSamplingPeriod(100)
                  .countBytes("dataflow_input_size-" + mapTask.getSystemName()));
        }
      } else {
        worker = workerAndContext.getWorker();
        context = workerAndContext.getContext();
      }

      WindmillStateReader stateReader = new WindmillStateReader(
          metricTrackingWindmillServer, computation, work.getKey(), work.getWorkToken());
      StateFetcher localStateFetcher = stateFetcher.byteTrackingView();
      context.start(work, inputDataWatermark, stateReader, localStateFetcher, outputBuilder);

      for (Long callbackId : context.getReadyCommitCallbackIds()) {
        final Runnable callback = commitCallbacks.remove(callbackId);
        if (callback != null) {
          workUnitExecutor.forceExecute(new Runnable() {
              @Override
              public void run() {
                try {
                  callback.run();
                } catch (Throwable t) {
                  LOG.error("Source checkpoint finalization failed:", t);
                }
              }
            });
        }
      }

      // Blocks while executing work.
      worker.execute();

      commitCallbacks.putAll(context.flushState());

      // Compute shuffle and state byte statistics after the work is completely done, but before
      // counters are added to the outputBuilder.
      long shuffleBytesRead = 0;
      for (Windmill.InputMessageBundle bundle : work.getMessageBundlesList()) {
        for (Windmill.Message message : bundle.getMessagesList()) {
          shuffleBytesRead += message.getSerializedSize();
        }
      }
      long stateBytesRead = stateReader.getBytesRead() + localStateFetcher.getBytesRead();
      long stateBytesWritten = Windmill.WorkItemCommitRequest.newBuilder(outputBuilder.build())
                                   .clearOutputMessages()
                                   .build()
                                   .getSerializedSize();
      CounterSet counters = worker.getOutputCounters();
      counters
          .getAddCounterMutator()
          .addCounter(Counter.longs("WindmillShuffleBytesRead", Counter.AggregationKind.SUM))
          .addValue(shuffleBytesRead);
      counters
          .getAddCounterMutator()
          .addCounter(Counter.longs("WindmillStateBytesRead", Counter.AggregationKind.SUM))
          .addValue(stateBytesRead);
      counters
          .getAddCounterMutator()
          .addCounter(Counter.longs("WindmillStateBytesWritten", Counter.AggregationKind.SUM))
          .addValue(stateBytesWritten);

      buildCounters(counters, outputBuilder);

      mapTaskExecutors.get(computation).offer(new WorkerAndContext(worker, context));
      worker = null;
      context = null;

      Windmill.WorkItemCommitRequest output = outputBuilder.build();
      outputMap.get(computation).add(output);
      scheduleCommit();

      LOG.debug("Processing done for work token: {}", work.getWorkToken());
    } catch (Throwable t) {
      if (worker != null) {
        try {
          worker.close();
        } catch (Exception e) {
          LOG.warn("Failed to close worker: ", e);
        } finally {
          // Release references to potentially large objects early.
          worker = null;
          context = null;
        }
      }

      t = t instanceof UserCodeException ? t.getCause() : t;

      if (isOutOfMemoryError(t)) {
        reportFailure(computation, work, t);
        LOG.error("Received OutOfMemoryError, crashing.  Error was ", t);
        System.exit(1);
      } else if (isKeyTokenInvalidException(t)) {
        LOG.debug(
            "Execution of work for {} for key {} failed due to token expiration, "
            + "will not retry locally.",
            computation, work.getKey().toStringUtf8());
        activeWorkMap.get(computation).completeWork(work.getKey());
      } else {
        LOG.error(
            "Execution of work for {} for key {} failed, retrying.",
            computation,
            work.getKey().toStringUtf8());
        LOG.error("\nError: ", t);
        lastException.set(t);
        LOG.debug("Failed work: {}", work);
        if (reportFailure(computation, work, t)) {
          // Try again, after some delay and at the end of the queue to avoid a tight loop.
          sleep(10000);
          workUnitExecutor.forceExecute(
              new Runnable() {
                @Override
                public void run() {
                  process(computation, mapTask, inputDataWatermark, work);
                }
              });
        } else {
          // If we failed to report the error, the item is invalid and should
          // not be retried internally.  It will be retried at the higher level.
          LOG.debug("Aborting processing due to exception reporting failure");
          activeWorkMap.get(computation).completeWork(work.getKey());
        }
      }
    } finally {
      DataflowWorkerLoggingMDC.setWorkId(null);
      DataflowWorkerLoggingMDC.setStageName(null);
    }
  }