in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker.java [450:628]
private void process(
final String computation,
final MapTask mapTask,
final Instant inputDataWatermark,
final Windmill.WorkItem work) {
LOG.debug("Starting processing for {}:\n{}", computation, work);
Windmill.WorkItemCommitRequest.Builder outputBuilder =
Windmill.WorkItemCommitRequest.newBuilder()
.setKey(work.getKey())
.setWorkToken(work.getWorkToken());
StreamingModeExecutionContext context = null;
MapTaskExecutor worker = null;
try {
DataflowWorkerLoggingMDC.setWorkId(
work.getKey().toStringUtf8() + "-" + Long.toString(work.getWorkToken()));
DataflowWorkerLoggingMDC.setStageName(computation);
WorkerAndContext workerAndContext = mapTaskExecutors.get(computation).poll();
if (workerAndContext == null) {
CounterSet counters = new CounterSet();
context = new StreamingModeExecutionContext(
mapTask.getSystemName(), readerCache.get(computation), stateNameMap);
StateSampler sampler =
new StateSampler(mapTask.getStageName() + "-", counters.getAddCounterMutator());
// In streaming mode, state samplers are long lived. So here a unique id is generated as
// the item_id for the userCodeTimeTracker.
int stateSamplerId = nextStateSamplerId.incrementAndGet();
sampler.addSamplingCallback(
new UserCodeTimeTracker.StateSamplerCallback(
userCodeTimeTracker, stateSamplerId));
// "work" will never finish here.
userCodeTimeTracker.workStarted(
sampler.getPrefix(), stateSamplerId, counters.getAddCounterMutator());
worker = MapTaskExecutorFactory.create(options, mapTask, context, counters, sampler);
ReadOperation readOperation = worker.getReadOperation();
// Disable progress updates since its results are unused for streaming
// and involves starting a thread.
readOperation.setProgressUpdatePeriodMs(ReadOperation.DONT_UPDATE_PERIODICALLY);
Preconditions.checkState(
worker.supportsRestart(), "Streaming runner requires all operations support restart.");
// If using a custom source, count bytes read for autoscaling.
ParallelInstruction read = mapTask.getInstructions().get(0);
if (CustomSources.class.getName().equals(
read.getRead().getSource().getSpec().get("@type"))) {
readOperation.receivers[0].addOutputCounter(
new OutputObjectAndByteCounter(
new MapTaskExecutorFactory.ElementByteSizeObservableCoder<>(
Serializer.deserialize(read.getOutputs().get(0).getCodec(), Coder.class)),
worker.getOutputCounters().getAddCounterMutator())
.setSamplingPeriod(100)
.countBytes("dataflow_input_size-" + mapTask.getSystemName()));
}
} else {
worker = workerAndContext.getWorker();
context = workerAndContext.getContext();
}
WindmillStateReader stateReader = new WindmillStateReader(
metricTrackingWindmillServer, computation, work.getKey(), work.getWorkToken());
StateFetcher localStateFetcher = stateFetcher.byteTrackingView();
context.start(work, inputDataWatermark, stateReader, localStateFetcher, outputBuilder);
for (Long callbackId : context.getReadyCommitCallbackIds()) {
final Runnable callback = commitCallbacks.remove(callbackId);
if (callback != null) {
workUnitExecutor.forceExecute(new Runnable() {
@Override
public void run() {
try {
callback.run();
} catch (Throwable t) {
LOG.error("Source checkpoint finalization failed:", t);
}
}
});
}
}
// Blocks while executing work.
worker.execute();
commitCallbacks.putAll(context.flushState());
// Compute shuffle and state byte statistics after the work is completely done, but before
// counters are added to the outputBuilder.
long shuffleBytesRead = 0;
for (Windmill.InputMessageBundle bundle : work.getMessageBundlesList()) {
for (Windmill.Message message : bundle.getMessagesList()) {
shuffleBytesRead += message.getSerializedSize();
}
}
long stateBytesRead = stateReader.getBytesRead() + localStateFetcher.getBytesRead();
long stateBytesWritten = Windmill.WorkItemCommitRequest.newBuilder(outputBuilder.build())
.clearOutputMessages()
.build()
.getSerializedSize();
CounterSet counters = worker.getOutputCounters();
counters
.getAddCounterMutator()
.addCounter(Counter.longs("WindmillShuffleBytesRead", Counter.AggregationKind.SUM))
.addValue(shuffleBytesRead);
counters
.getAddCounterMutator()
.addCounter(Counter.longs("WindmillStateBytesRead", Counter.AggregationKind.SUM))
.addValue(stateBytesRead);
counters
.getAddCounterMutator()
.addCounter(Counter.longs("WindmillStateBytesWritten", Counter.AggregationKind.SUM))
.addValue(stateBytesWritten);
buildCounters(counters, outputBuilder);
mapTaskExecutors.get(computation).offer(new WorkerAndContext(worker, context));
worker = null;
context = null;
Windmill.WorkItemCommitRequest output = outputBuilder.build();
outputMap.get(computation).add(output);
scheduleCommit();
LOG.debug("Processing done for work token: {}", work.getWorkToken());
} catch (Throwable t) {
if (worker != null) {
try {
worker.close();
} catch (Exception e) {
LOG.warn("Failed to close worker: ", e);
} finally {
// Release references to potentially large objects early.
worker = null;
context = null;
}
}
t = t instanceof UserCodeException ? t.getCause() : t;
if (isOutOfMemoryError(t)) {
reportFailure(computation, work, t);
LOG.error("Received OutOfMemoryError, crashing. Error was ", t);
System.exit(1);
} else if (isKeyTokenInvalidException(t)) {
LOG.debug(
"Execution of work for {} for key {} failed due to token expiration, "
+ "will not retry locally.",
computation, work.getKey().toStringUtf8());
activeWorkMap.get(computation).completeWork(work.getKey());
} else {
LOG.error(
"Execution of work for {} for key {} failed, retrying.",
computation,
work.getKey().toStringUtf8());
LOG.error("\nError: ", t);
lastException.set(t);
LOG.debug("Failed work: {}", work);
if (reportFailure(computation, work, t)) {
// Try again, after some delay and at the end of the queue to avoid a tight loop.
sleep(10000);
workUnitExecutor.forceExecute(
new Runnable() {
@Override
public void run() {
process(computation, mapTask, inputDataWatermark, work);
}
});
} else {
// If we failed to report the error, the item is invalid and should
// not be retried internally. It will be retried at the higher level.
LOG.debug("Aborting processing due to exception reporting failure");
activeWorkMap.get(computation).completeWork(work.getKey());
}
}
} finally {
DataflowWorkerLoggingMDC.setWorkId(null);
DataflowWorkerLoggingMDC.setStageName(null);
}
}