in runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java [1248:1561]
private void process(
final SdkWorkerHarness worker,
final ComputationState computationState,
final Instant inputDataWatermark,
final @Nullable Instant outputDataWatermark,
final @Nullable Instant synchronizedProcessingTime,
final Work work) {
final Windmill.WorkItem workItem = work.getWorkItem();
final String computationId = computationState.getComputationId();
final ByteString key = workItem.getKey();
work.setState(State.PROCESSING);
{
StringBuilder workIdBuilder = new StringBuilder(33);
workIdBuilder.append(Long.toHexString(workItem.getShardingKey()));
workIdBuilder.append('-');
workIdBuilder.append(Long.toHexString(workItem.getWorkToken()));
DataflowWorkerLoggingMDC.setWorkId(workIdBuilder.toString());
}
DataflowWorkerLoggingMDC.setStageName(computationId);
LOG.debug("Starting processing for {}:\n{}", computationId, work);
Windmill.WorkItemCommitRequest.Builder outputBuilder = initializeOutputBuilder(key, workItem);
// Before any processing starts, call any pending OnCommit callbacks. Nothing that requires
// cleanup should be done before this, since we might exit early here.
callFinalizeCallbacks(workItem);
if (workItem.getSourceState().getOnlyFinalize()) {
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
work.setState(State.COMMIT_QUEUED);
commitQueue.put(new Commit(outputBuilder.build(), computationState, work));
return;
}
long processingStartTimeNanos = System.nanoTime();
final MapTask mapTask = computationState.getMapTask();
StageInfo stageInfo =
stageInfoMap.computeIfAbsent(
mapTask.getStageName(), s -> new StageInfo(s, mapTask.getSystemName(), this));
ExecutionState executionState = null;
try {
executionState = computationState.getExecutionStateQueue(worker).poll();
if (executionState == null) {
MutableNetwork<Node, Edge> mapTaskNetwork = mapTaskToNetwork.apply(mapTask);
if (LOG.isDebugEnabled()) {
LOG.debug("Network as Graphviz .dot: {}", Networks.toDot(mapTaskNetwork));
}
ParallelInstructionNode readNode =
(ParallelInstructionNode)
Iterables.find(
mapTaskNetwork.nodes(),
node ->
node instanceof ParallelInstructionNode
&& ((ParallelInstructionNode) node).getParallelInstruction().getRead()
!= null);
InstructionOutputNode readOutputNode =
(InstructionOutputNode) Iterables.getOnlyElement(mapTaskNetwork.successors(readNode));
DataflowExecutionContext.DataflowExecutionStateTracker executionStateTracker =
new DataflowExecutionContext.DataflowExecutionStateTracker(
ExecutionStateSampler.instance(),
stageInfo.executionStateRegistry.getState(
NameContext.forStage(mapTask.getStageName()),
"other",
null,
ScopedProfiler.INSTANCE.emptyScope()),
stageInfo.deltaCounters,
options,
computationId);
StreamingModeExecutionContext context =
new StreamingModeExecutionContext(
pendingDeltaCounters,
computationId,
readerCache,
!computationState.getTransformUserNameToStateFamily().isEmpty()
? computationState.getTransformUserNameToStateFamily()
: stateNameMap,
stateCache.forComputation(computationId),
stageInfo.metricsContainerRegistry,
executionStateTracker,
stageInfo.executionStateRegistry,
maxSinkBytes);
DataflowMapTaskExecutor mapTaskExecutor =
mapTaskExecutorFactory.create(
worker.getControlClientHandler(),
worker.getGrpcDataFnServer(),
sdkHarnessRegistry.beamFnDataApiServiceDescriptor(),
worker.getGrpcStateFnServer(),
mapTaskNetwork,
options,
mapTask.getStageName(),
readerRegistry,
sinkRegistry,
context,
pendingDeltaCounters,
idGenerator);
ReadOperation readOperation = mapTaskExecutor.getReadOperation();
// Disable progress updates since its results are unused for streaming
// and involves starting a thread.
readOperation.setProgressUpdatePeriodMs(ReadOperation.DONT_UPDATE_PERIODICALLY);
Preconditions.checkState(
mapTaskExecutor.supportsRestart(),
"Streaming runner requires all operations support restart.");
Coder<?> readCoder;
readCoder =
CloudObjects.coderFromCloudObject(
CloudObject.fromSpec(readOutputNode.getInstructionOutput().getCodec()));
Coder<?> keyCoder = extractKeyCoder(readCoder);
// If using a custom source, count bytes read for autoscaling.
if (CustomSources.class
.getName()
.equals(
readNode.getParallelInstruction().getRead().getSource().getSpec().get("@type"))) {
NameContext nameContext =
NameContext.create(
mapTask.getStageName(),
readNode.getParallelInstruction().getOriginalName(),
readNode.getParallelInstruction().getSystemName(),
readNode.getParallelInstruction().getName());
readOperation.receivers[0].addOutputCounter(
new OutputObjectAndByteCounter(
new IntrinsicMapTaskExecutorFactory.ElementByteSizeObservableCoder<>(
readCoder),
mapTaskExecutor.getOutputCounters(),
nameContext)
.setSamplingPeriod(100)
.countBytes("dataflow_input_size-" + mapTask.getSystemName()));
}
executionState =
new ExecutionState(mapTaskExecutor, context, keyCoder, executionStateTracker);
}
WindmillStateReader stateReader =
new WindmillStateReader(
metricTrackingWindmillServer,
computationId,
key,
workItem.getShardingKey(),
workItem.getWorkToken());
StateFetcher localStateFetcher = stateFetcher.byteTrackingView();
// If the read output KVs, then we can decode Windmill's byte key into a userland
// key object and provide it to the execution context for use with per-key state.
// Otherwise, we pass null.
//
// The coder type that will be present is:
// WindowedValueCoder(TimerOrElementCoder(KvCoder))
@Nullable Coder<?> keyCoder = executionState.getKeyCoder();
@Nullable
Object executionKey =
keyCoder == null ? null : keyCoder.decode(key.newInput(), Coder.Context.OUTER);
if (workItem.hasHotKeyInfo()) {
Windmill.HotKeyInfo hotKeyInfo = workItem.getHotKeyInfo();
Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() / 1000);
// The MapTask instruction is ordered by dependencies, such that the first element is
// always going to be the shuffle task.
String stepName = computationState.getMapTask().getInstructions().get(0).getName();
if (options.isHotKeyLoggingEnabled() && keyCoder != null) {
hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey);
} else {
hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);
}
}
executionState
.getContext()
.start(
executionKey,
workItem,
inputDataWatermark,
outputDataWatermark,
synchronizedProcessingTime,
stateReader,
localStateFetcher,
outputBuilder);
// Blocks while executing work.
executionState.getWorkExecutor().execute();
Iterables.addAll(
this.pendingMonitoringInfos, executionState.getWorkExecutor().extractMetricUpdates());
commitCallbacks.putAll(executionState.getContext().flushState());
// Release the execution state for another thread to use.
computationState.getExecutionStateQueue(worker).offer(executionState);
executionState = null;
// Add the output to the commit queue.
work.setState(State.COMMIT_QUEUED);
WorkItemCommitRequest commitRequest = outputBuilder.build();
int byteLimit = maxWorkItemCommitBytes;
int commitSize = commitRequest.getSerializedSize();
int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : commitSize;
// Detect overflow of integer serialized size or if the byte limit was exceeded.
windmillMaxObservedWorkItemCommitBytes.addValue(estimatedCommitSize);
if (commitSize < 0 || commitSize > byteLimit) {
KeyCommitTooLargeException e =
KeyCommitTooLargeException.causedBy(computationId, byteLimit, commitRequest);
reportFailure(computationId, workItem, e);
LOG.error(e.toString());
// Drop the current request in favor of a new, minimal one requesting truncation.
// Messages, timers, counters, and other commit content will not be used by the service
// so we're purposefully dropping them here
commitRequest = buildWorkItemTruncationRequest(key, workItem, estimatedCommitSize);
}
commitQueue.put(new Commit(commitRequest, computationState, work));
// Compute shuffle and state byte statistics these will be flushed asynchronously.
long stateBytesWritten = outputBuilder.clearOutputMessages().build().getSerializedSize();
long shuffleBytesRead = 0;
for (Windmill.InputMessageBundle bundle : workItem.getMessageBundlesList()) {
for (Windmill.Message message : bundle.getMessagesList()) {
shuffleBytesRead += message.getSerializedSize();
}
}
long stateBytesRead = stateReader.getBytesRead() + localStateFetcher.getBytesRead();
windmillShuffleBytesRead.addValue(shuffleBytesRead);
windmillStateBytesRead.addValue(stateBytesRead);
windmillStateBytesWritten.addValue(stateBytesWritten);
LOG.debug("Processing done for work token: {}", workItem.getWorkToken());
} catch (Throwable t) {
if (executionState != null) {
try {
executionState.getContext().invalidateCache();
executionState.getWorkExecutor().close();
} catch (Exception e) {
LOG.warn("Failed to close map task executor: ", e);
} finally {
// Release references to potentially large objects early.
executionState = null;
}
}
t = t instanceof UserCodeException ? t.getCause() : t;
boolean retryLocally = false;
if (KeyTokenInvalidException.isKeyTokenInvalidException(t)) {
LOG.debug(
"Execution of work for computation '{}' on key '{}' failed due to token expiration. "
+ "Work will not be retried locally.",
computationId,
key.toStringUtf8());
} else {
LastExceptionDataProvider.reportException(t);
LOG.debug("Failed work: {}", work);
if (!reportFailure(computationId, workItem, t)) {
LOG.error(
"Execution of work for computation '{}' on key '{}' failed with uncaught exception, "
+ "and Windmill indicated not to retry locally.",
computationId,
key.toStringUtf8(),
t);
} else if (isOutOfMemoryError(t)) {
File heapDump = memoryMonitor.tryToDumpHeap();
LOG.error(
"Execution of work for computation '{}' for key '{}' failed with out-of-memory. "
+ "Work will not be retried locally. Heap dump {}.",
computationId,
key.toStringUtf8(),
heapDump == null ? "not written" : ("written to '" + heapDump + "'"),
t);
} else {
LOG.error(
"Execution of work for computation '{}' on key '{}' failed with uncaught exception. "
+ "Work will be retried locally.",
computationId,
key.toStringUtf8(),
t);
retryLocally = true;
}
}
if (retryLocally) {
// Try again after some delay and at the end of the queue to avoid a tight loop.
sleep(retryLocallyDelayMs);
workUnitExecutor.forceExecute(work);
} else {
// Consider the item invalid. It will eventually be retried by Windmill if it still needs to
// be processed.
computationState.completeWork(
ShardedKey.create(key, workItem.getShardingKey()), workItem.getWorkToken());
}
} finally {
// Update total processing time counters. Updating in finally clause ensures that
// work items causing exceptions are also accounted in time spent.
long processingTimeMsecs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - processingStartTimeNanos);
stageInfo.totalProcessingMsecs.addValue(processingTimeMsecs);
// Attribute all the processing to timers if the work item contains any timers.
// Tests show that work items rarely contain both timers and message bundles. It should
// be a fairly close approximation.
// Another option: Derive time split between messages and timers based on recent totals.
// either here or in DFE.
if (work.getWorkItem().hasTimers()) {
stageInfo.timerProcessingMsecs.addValue(processingTimeMsecs);
}
DataflowWorkerLoggingMDC.setWorkId(null);
DataflowWorkerLoggingMDC.setStageName(null);
}
}