private boolean doWork()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/DataflowWorker.java [152:246]


  private boolean doWork(WorkItem workItem) throws IOException {
    LOG.debug("Executing: {}", workItem);

    WorkExecutor worker = null;
    SourceOperationResponse operationResponse = null;
    long nextReportIndex = workItem.getInitialReportIndex();
    try {
      // Populate PipelineOptions with data from work unit.
      options.setProject(workItem.getProjectId());

      DataflowExecutionContext executionContext =
          new DataflowWorkerExecutionContext(sideInputCache, options);

      CounterSet counters = new CounterSet();
      StateSampler sampler = null;

      if (workItem.getMapTask() != null) {
        sampler = new StateSampler(
            workItem.getMapTask().getStageName() + "-", counters.getAddCounterMutator());
        worker = MapTaskExecutorFactory.create(
            options, workItem.getMapTask(), executionContext, counters, sampler);
      } else if (workItem.getSourceOperationTask() != null) {
        sampler = new StateSampler(
            "source-operation-", counters.getAddCounterMutator());
        worker = SourceOperationExecutorFactory.create(options, workItem.getSourceOperationTask());
      } else {
        throw new RuntimeException("Unknown kind of work item: " + workItem.toString());
      }

      sampler.addSamplingCallback(
          new UserCodeTimeTracker.StateSamplerCallback(
              userCodeTimeTracker, workItem.getId()));

      DataflowWorkProgressUpdater progressUpdater =
          new DataflowWorkProgressUpdater(workItem, worker, workUnitClient, options);
      try (AutoCloseable scope = userCodeTimeTracker.scopedWork(
              sampler.getPrefix(), workItem.getId(), counters.getAddCounterMutator())) {
        // Nested try/finally is used to make sure worker.close() happen before scope.close().
        try {
          executeWork(worker, progressUpdater);
        } finally {
          worker.close();
          // Grab nextReportIndex so we can use it in handleWorkError if there is an exception.
          nextReportIndex = progressUpdater.getNextReportIndex();
        }
      }

      // Log all counter values for debugging purposes.
      for (Counter<?> counter : counters) {
        LOG.trace("COUNTER {}.", counter);
      }

      // Log all metrics for debugging purposes.
      Collection<Metric<?>> metrics = worker.getOutputMetrics();
      for (Metric<?> metric : metrics) {
        LOG.trace("METRIC {}: {}", metric.getName(), metric.getValue());
      }

      // Report job success.
      // TODO: Find out a generic way for the WorkExecutor to report work-specific results
      // into the work update.
      operationResponse =
          (worker instanceof SourceOperationExecutor)
              ? ((SourceOperationExecutor) worker).getResponse()
              : null;

      try {
        reportStatus(
          options, "Success", workItem, counters, metrics, operationResponse, null/*errors*/,
          nextReportIndex);
      } catch (GoogleJsonResponseException e) {
        if ((operationResponse != null) && (worker instanceof SourceOperationExecutor)) {
          if (isSplitResponseTooLarge(operationResponse)) {
            throw new RuntimeException(SPLIT_RESPONSE_TOO_LARGE_ERROR, e);
          }
        }
        throw e;
      }

      return true;

    } catch (Throwable e) {
      handleWorkError(workItem, worker, nextReportIndex, e);
      return false;

    } finally {
      if (worker != null) {
        try {
          worker.close();
        } catch (Exception exn) {
          LOG.warn("Uncaught exception occurred during work unit shutdown:", exn);
        }
      }
    }
  }