in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/DataflowWorker.java [152:246]
private boolean doWork(WorkItem workItem) throws IOException {
LOG.debug("Executing: {}", workItem);
WorkExecutor worker = null;
SourceOperationResponse operationResponse = null;
long nextReportIndex = workItem.getInitialReportIndex();
try {
// Populate PipelineOptions with data from work unit.
options.setProject(workItem.getProjectId());
DataflowExecutionContext executionContext =
new DataflowWorkerExecutionContext(sideInputCache, options);
CounterSet counters = new CounterSet();
StateSampler sampler = null;
if (workItem.getMapTask() != null) {
sampler = new StateSampler(
workItem.getMapTask().getStageName() + "-", counters.getAddCounterMutator());
worker = MapTaskExecutorFactory.create(
options, workItem.getMapTask(), executionContext, counters, sampler);
} else if (workItem.getSourceOperationTask() != null) {
sampler = new StateSampler(
"source-operation-", counters.getAddCounterMutator());
worker = SourceOperationExecutorFactory.create(options, workItem.getSourceOperationTask());
} else {
throw new RuntimeException("Unknown kind of work item: " + workItem.toString());
}
sampler.addSamplingCallback(
new UserCodeTimeTracker.StateSamplerCallback(
userCodeTimeTracker, workItem.getId()));
DataflowWorkProgressUpdater progressUpdater =
new DataflowWorkProgressUpdater(workItem, worker, workUnitClient, options);
try (AutoCloseable scope = userCodeTimeTracker.scopedWork(
sampler.getPrefix(), workItem.getId(), counters.getAddCounterMutator())) {
// Nested try/finally is used to make sure worker.close() happen before scope.close().
try {
executeWork(worker, progressUpdater);
} finally {
worker.close();
// Grab nextReportIndex so we can use it in handleWorkError if there is an exception.
nextReportIndex = progressUpdater.getNextReportIndex();
}
}
// Log all counter values for debugging purposes.
for (Counter<?> counter : counters) {
LOG.trace("COUNTER {}.", counter);
}
// Log all metrics for debugging purposes.
Collection<Metric<?>> metrics = worker.getOutputMetrics();
for (Metric<?> metric : metrics) {
LOG.trace("METRIC {}: {}", metric.getName(), metric.getValue());
}
// Report job success.
// TODO: Find out a generic way for the WorkExecutor to report work-specific results
// into the work update.
operationResponse =
(worker instanceof SourceOperationExecutor)
? ((SourceOperationExecutor) worker).getResponse()
: null;
try {
reportStatus(
options, "Success", workItem, counters, metrics, operationResponse, null/*errors*/,
nextReportIndex);
} catch (GoogleJsonResponseException e) {
if ((operationResponse != null) && (worker instanceof SourceOperationExecutor)) {
if (isSplitResponseTooLarge(operationResponse)) {
throw new RuntimeException(SPLIT_RESPONSE_TOO_LARGE_ERROR, e);
}
}
throw e;
}
return true;
} catch (Throwable e) {
handleWorkError(workItem, worker, nextReportIndex, e);
return false;
} finally {
if (worker != null) {
try {
worker.close();
} catch (Exception exn) {
LOG.warn("Uncaught exception occurred during work unit shutdown:", exn);
}
}
}
}