static WorkItemStatus buildStatus()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/DataflowWorker.java [308:392]


  static WorkItemStatus buildStatus(WorkItem workItem, boolean completed,
      @Nullable CounterSet counters, @Nullable Collection<Metric<?>> metrics,
      DataflowWorkerHarnessOptions options, @Nullable Reader.Progress progress,
      @Nullable Reader.DynamicSplitResult dynamicSplitResult,
      @Nullable SourceOperationResponse operationResponse, @Nullable List<Status> errors,
      long reportIndex,
      @Nullable StateSampler.StateSamplerInfo stateSamplerInfo) {
    WorkItemStatus status = new WorkItemStatus();
    status.setWorkItemId(Long.toString(workItem.getId()));
    status.setCompleted(completed);
    status.setReportIndex(reportIndex);

    List<MetricUpdate> counterUpdates = null;
    List<MetricUpdate> metricUpdates = null;

    if (counters != null) {
      // Currently we lack a reliable exactly-once delivery mechanism for
      // work updates, i.e. they can be retried or reordered, so sending
      // delta updates could lead to double-counted or missed contributions.
      // However, delta updates may be beneficial for performance.
      // TODO: Implement exactly-once delivery and use deltas,
      // if it ever becomes clear that deltas are necessary for performance.
      boolean delta = false;
      counterUpdates = CloudCounterUtils.extractCounters(counters, delta);
    }
    if (metrics != null) {
      metricUpdates = CloudMetricUtils.extractCloudMetrics(metrics, options.getWorkerId());
    }
    List<MetricUpdate> updates = new ArrayList<>();
    if (counterUpdates != null) {
      updates.addAll(counterUpdates);
    }
    if (metricUpdates != null) {
      updates.addAll(metricUpdates);
    }
    if (stateSamplerInfo != null) {
      MetricUpdate update = new MetricUpdate();
      update.setKind("internal");
      MetricStructuredName name = new MetricStructuredName();
      name.setName("state-sampler");
      update.setName(name);
      Map<String, Object> metric = new HashMap<String, Object>();
      if (stateSamplerInfo.state != null) {
        metric.put("last-state-name", stateSamplerInfo.state);
      }
      if (stateSamplerInfo.transitionCount != null) {
        metric.put("num-transitions", stateSamplerInfo.transitionCount);
      }
      if (stateSamplerInfo.stateDurationMillis != null) {
        metric.put("last-state-duration-ms",
            stateSamplerInfo.stateDurationMillis);
      }
      update.setInternal(metric);
      updates.add(update);
    }
    status.setMetricUpdates(updates);

    // TODO: Provide more structure representation of error,
    // e.g., the serialized exception object.
    if (errors != null) {
      status.setErrors(errors);
    }

    if (progress != null) {
      status.setProgress(readerProgressToCloudProgress(progress));
    }
    if (dynamicSplitResult instanceof Reader.DynamicSplitResultWithPosition) {
      Reader.DynamicSplitResultWithPosition asPosition =
          (Reader.DynamicSplitResultWithPosition) dynamicSplitResult;
      status.setStopPosition(toCloudPosition(asPosition.getAcceptedPosition()));
    } else if (dynamicSplitResult instanceof CustomSources.BoundedSourceSplit) {
      status.setDynamicSourceSplit(
          CustomSources.toSourceSplit(
              (CustomSources.BoundedSourceSplit<?>) dynamicSplitResult, options));
    } else if (dynamicSplitResult != null) {
      throw new IllegalArgumentException(
          "Unexpected type of dynamic split result: " + dynamicSplitResult);
    }

    if (workItem.getSourceOperationTask() != null) {
      status.setSourceOperationResponse(operationResponse);
    }

    return status;
  }