private void process()

in runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java [1248:1561]


  private void process(
      final SdkWorkerHarness worker,
      final ComputationState computationState,
      final Instant inputDataWatermark,
      final @Nullable Instant outputDataWatermark,
      final @Nullable Instant synchronizedProcessingTime,
      final Work work) {
    final Windmill.WorkItem workItem = work.getWorkItem();
    final String computationId = computationState.getComputationId();
    final ByteString key = workItem.getKey();
    work.setState(State.PROCESSING);
    {
      StringBuilder workIdBuilder = new StringBuilder(33);
      workIdBuilder.append(Long.toHexString(workItem.getShardingKey()));
      workIdBuilder.append('-');
      workIdBuilder.append(Long.toHexString(workItem.getWorkToken()));
      DataflowWorkerLoggingMDC.setWorkId(workIdBuilder.toString());
    }

    DataflowWorkerLoggingMDC.setStageName(computationId);
    LOG.debug("Starting processing for {}:\n{}", computationId, work);

    Windmill.WorkItemCommitRequest.Builder outputBuilder = initializeOutputBuilder(key, workItem);

    // Before any processing starts, call any pending OnCommit callbacks.  Nothing that requires
    // cleanup should be done before this, since we might exit early here.
    callFinalizeCallbacks(workItem);
    if (workItem.getSourceState().getOnlyFinalize()) {
      outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
      work.setState(State.COMMIT_QUEUED);
      commitQueue.put(new Commit(outputBuilder.build(), computationState, work));
      return;
    }

    long processingStartTimeNanos = System.nanoTime();

    final MapTask mapTask = computationState.getMapTask();

    StageInfo stageInfo =
        stageInfoMap.computeIfAbsent(
            mapTask.getStageName(), s -> new StageInfo(s, mapTask.getSystemName(), this));

    ExecutionState executionState = null;

    try {
      executionState = computationState.getExecutionStateQueue(worker).poll();
      if (executionState == null) {
        MutableNetwork<Node, Edge> mapTaskNetwork = mapTaskToNetwork.apply(mapTask);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Network as Graphviz .dot: {}", Networks.toDot(mapTaskNetwork));
        }
        ParallelInstructionNode readNode =
            (ParallelInstructionNode)
                Iterables.find(
                    mapTaskNetwork.nodes(),
                    node ->
                        node instanceof ParallelInstructionNode
                            && ((ParallelInstructionNode) node).getParallelInstruction().getRead()
                                != null);
        InstructionOutputNode readOutputNode =
            (InstructionOutputNode) Iterables.getOnlyElement(mapTaskNetwork.successors(readNode));
        DataflowExecutionContext.DataflowExecutionStateTracker executionStateTracker =
            new DataflowExecutionContext.DataflowExecutionStateTracker(
                ExecutionStateSampler.instance(),
                stageInfo.executionStateRegistry.getState(
                    NameContext.forStage(mapTask.getStageName()),
                    "other",
                    null,
                    ScopedProfiler.INSTANCE.emptyScope()),
                stageInfo.deltaCounters,
                options,
                computationId);
        StreamingModeExecutionContext context =
            new StreamingModeExecutionContext(
                pendingDeltaCounters,
                computationId,
                readerCache,
                !computationState.getTransformUserNameToStateFamily().isEmpty()
                    ? computationState.getTransformUserNameToStateFamily()
                    : stateNameMap,
                stateCache.forComputation(computationId),
                stageInfo.metricsContainerRegistry,
                executionStateTracker,
                stageInfo.executionStateRegistry,
                maxSinkBytes);
        DataflowMapTaskExecutor mapTaskExecutor =
            mapTaskExecutorFactory.create(
                worker.getControlClientHandler(),
                worker.getGrpcDataFnServer(),
                sdkHarnessRegistry.beamFnDataApiServiceDescriptor(),
                worker.getGrpcStateFnServer(),
                mapTaskNetwork,
                options,
                mapTask.getStageName(),
                readerRegistry,
                sinkRegistry,
                context,
                pendingDeltaCounters,
                idGenerator);
        ReadOperation readOperation = mapTaskExecutor.getReadOperation();
        // Disable progress updates since its results are unused  for streaming
        // and involves starting a thread.
        readOperation.setProgressUpdatePeriodMs(ReadOperation.DONT_UPDATE_PERIODICALLY);
        Preconditions.checkState(
            mapTaskExecutor.supportsRestart(),
            "Streaming runner requires all operations support restart.");

        Coder<?> readCoder;
        readCoder =
            CloudObjects.coderFromCloudObject(
                CloudObject.fromSpec(readOutputNode.getInstructionOutput().getCodec()));
        Coder<?> keyCoder = extractKeyCoder(readCoder);

        // If using a custom source, count bytes read for autoscaling.
        if (CustomSources.class
            .getName()
            .equals(
                readNode.getParallelInstruction().getRead().getSource().getSpec().get("@type"))) {
          NameContext nameContext =
              NameContext.create(
                  mapTask.getStageName(),
                  readNode.getParallelInstruction().getOriginalName(),
                  readNode.getParallelInstruction().getSystemName(),
                  readNode.getParallelInstruction().getName());
          readOperation.receivers[0].addOutputCounter(
              new OutputObjectAndByteCounter(
                      new IntrinsicMapTaskExecutorFactory.ElementByteSizeObservableCoder<>(
                          readCoder),
                      mapTaskExecutor.getOutputCounters(),
                      nameContext)
                  .setSamplingPeriod(100)
                  .countBytes("dataflow_input_size-" + mapTask.getSystemName()));
        }
        executionState =
            new ExecutionState(mapTaskExecutor, context, keyCoder, executionStateTracker);
      }

      WindmillStateReader stateReader =
          new WindmillStateReader(
              metricTrackingWindmillServer,
              computationId,
              key,
              workItem.getShardingKey(),
              workItem.getWorkToken());
      StateFetcher localStateFetcher = stateFetcher.byteTrackingView();

      // If the read output KVs, then we can decode Windmill's byte key into a userland
      // key object and provide it to the execution context for use with per-key state.
      // Otherwise, we pass null.
      //
      // The coder type that will be present is:
      //     WindowedValueCoder(TimerOrElementCoder(KvCoder))
      @Nullable Coder<?> keyCoder = executionState.getKeyCoder();
      @Nullable
      Object executionKey =
          keyCoder == null ? null : keyCoder.decode(key.newInput(), Coder.Context.OUTER);

      if (workItem.hasHotKeyInfo()) {
        Windmill.HotKeyInfo hotKeyInfo = workItem.getHotKeyInfo();
        Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() / 1000);

        // The MapTask instruction is ordered by dependencies, such that the first element is
        // always going to be the shuffle task.
        String stepName = computationState.getMapTask().getInstructions().get(0).getName();
        if (options.isHotKeyLoggingEnabled() && keyCoder != null) {
          hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey);
        } else {
          hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);
        }
      }

      executionState
          .getContext()
          .start(
              executionKey,
              workItem,
              inputDataWatermark,
              outputDataWatermark,
              synchronizedProcessingTime,
              stateReader,
              localStateFetcher,
              outputBuilder);

      // Blocks while executing work.
      executionState.getWorkExecutor().execute();

      Iterables.addAll(
          this.pendingMonitoringInfos, executionState.getWorkExecutor().extractMetricUpdates());

      commitCallbacks.putAll(executionState.getContext().flushState());

      // Release the execution state for another thread to use.
      computationState.getExecutionStateQueue(worker).offer(executionState);
      executionState = null;

      // Add the output to the commit queue.
      work.setState(State.COMMIT_QUEUED);

      WorkItemCommitRequest commitRequest = outputBuilder.build();
      int byteLimit = maxWorkItemCommitBytes;
      int commitSize = commitRequest.getSerializedSize();
      int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : commitSize;

      // Detect overflow of integer serialized size or if the byte limit was exceeded.
      windmillMaxObservedWorkItemCommitBytes.addValue(estimatedCommitSize);
      if (commitSize < 0 || commitSize > byteLimit) {
        KeyCommitTooLargeException e =
            KeyCommitTooLargeException.causedBy(computationId, byteLimit, commitRequest);
        reportFailure(computationId, workItem, e);
        LOG.error(e.toString());

        // Drop the current request in favor of a new, minimal one requesting truncation.
        // Messages, timers, counters, and other commit content will not be used by the service
        // so we're purposefully dropping them here
        commitRequest = buildWorkItemTruncationRequest(key, workItem, estimatedCommitSize);
      }

      commitQueue.put(new Commit(commitRequest, computationState, work));

      // Compute shuffle and state byte statistics these will be flushed asynchronously.
      long stateBytesWritten = outputBuilder.clearOutputMessages().build().getSerializedSize();
      long shuffleBytesRead = 0;
      for (Windmill.InputMessageBundle bundle : workItem.getMessageBundlesList()) {
        for (Windmill.Message message : bundle.getMessagesList()) {
          shuffleBytesRead += message.getSerializedSize();
        }
      }
      long stateBytesRead = stateReader.getBytesRead() + localStateFetcher.getBytesRead();
      windmillShuffleBytesRead.addValue(shuffleBytesRead);
      windmillStateBytesRead.addValue(stateBytesRead);
      windmillStateBytesWritten.addValue(stateBytesWritten);

      LOG.debug("Processing done for work token: {}", workItem.getWorkToken());
    } catch (Throwable t) {
      if (executionState != null) {
        try {
          executionState.getContext().invalidateCache();
          executionState.getWorkExecutor().close();
        } catch (Exception e) {
          LOG.warn("Failed to close map task executor: ", e);
        } finally {
          // Release references to potentially large objects early.
          executionState = null;
        }
      }

      t = t instanceof UserCodeException ? t.getCause() : t;

      boolean retryLocally = false;
      if (KeyTokenInvalidException.isKeyTokenInvalidException(t)) {
        LOG.debug(
            "Execution of work for computation '{}' on key '{}' failed due to token expiration. "
                + "Work will not be retried locally.",
            computationId,
            key.toStringUtf8());
      } else {
        LastExceptionDataProvider.reportException(t);
        LOG.debug("Failed work: {}", work);
        if (!reportFailure(computationId, workItem, t)) {
          LOG.error(
              "Execution of work for computation '{}' on key '{}' failed with uncaught exception, "
                  + "and Windmill indicated not to retry locally.",
              computationId,
              key.toStringUtf8(),
              t);
        } else if (isOutOfMemoryError(t)) {
          File heapDump = memoryMonitor.tryToDumpHeap();
          LOG.error(
              "Execution of work for computation '{}' for key '{}' failed with out-of-memory. "
                  + "Work will not be retried locally. Heap dump {}.",
              computationId,
              key.toStringUtf8(),
              heapDump == null ? "not written" : ("written to '" + heapDump + "'"),
              t);
        } else {
          LOG.error(
              "Execution of work for computation '{}' on key '{}' failed with uncaught exception. "
                  + "Work will be retried locally.",
              computationId,
              key.toStringUtf8(),
              t);
          retryLocally = true;
        }
      }
      if (retryLocally) {
        // Try again after some delay and at the end of the queue to avoid a tight loop.
        sleep(retryLocallyDelayMs);
        workUnitExecutor.forceExecute(work);
      } else {
        // Consider the item invalid. It will eventually be retried by Windmill if it still needs to
        // be processed.
        computationState.completeWork(
            ShardedKey.create(key, workItem.getShardingKey()), workItem.getWorkToken());
      }
    } finally {
      // Update total processing time counters. Updating in finally clause ensures that
      // work items causing exceptions are also accounted in time spent.
      long processingTimeMsecs =
          TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - processingStartTimeNanos);
      stageInfo.totalProcessingMsecs.addValue(processingTimeMsecs);

      // Attribute all the processing to timers if the work item contains any timers.
      // Tests show that work items rarely contain both timers and message bundles. It should
      // be a fairly close approximation.
      // Another option: Derive time split between messages and timers based on recent totals.
      // either here or in DFE.
      if (work.getWorkItem().hasTimers()) {
        stageInfo.timerProcessingMsecs.addValue(processingTimeMsecs);
      }

      DataflowWorkerLoggingMDC.setWorkId(null);
      DataflowWorkerLoggingMDC.setStageName(null);
    }
  }