in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/DataflowWorker.java [308:392]
static WorkItemStatus buildStatus(WorkItem workItem, boolean completed,
@Nullable CounterSet counters, @Nullable Collection<Metric<?>> metrics,
DataflowWorkerHarnessOptions options, @Nullable Reader.Progress progress,
@Nullable Reader.DynamicSplitResult dynamicSplitResult,
@Nullable SourceOperationResponse operationResponse, @Nullable List<Status> errors,
long reportIndex,
@Nullable StateSampler.StateSamplerInfo stateSamplerInfo) {
WorkItemStatus status = new WorkItemStatus();
status.setWorkItemId(Long.toString(workItem.getId()));
status.setCompleted(completed);
status.setReportIndex(reportIndex);
List<MetricUpdate> counterUpdates = null;
List<MetricUpdate> metricUpdates = null;
if (counters != null) {
// Currently we lack a reliable exactly-once delivery mechanism for
// work updates, i.e. they can be retried or reordered, so sending
// delta updates could lead to double-counted or missed contributions.
// However, delta updates may be beneficial for performance.
// TODO: Implement exactly-once delivery and use deltas,
// if it ever becomes clear that deltas are necessary for performance.
boolean delta = false;
counterUpdates = CloudCounterUtils.extractCounters(counters, delta);
}
if (metrics != null) {
metricUpdates = CloudMetricUtils.extractCloudMetrics(metrics, options.getWorkerId());
}
List<MetricUpdate> updates = new ArrayList<>();
if (counterUpdates != null) {
updates.addAll(counterUpdates);
}
if (metricUpdates != null) {
updates.addAll(metricUpdates);
}
if (stateSamplerInfo != null) {
MetricUpdate update = new MetricUpdate();
update.setKind("internal");
MetricStructuredName name = new MetricStructuredName();
name.setName("state-sampler");
update.setName(name);
Map<String, Object> metric = new HashMap<String, Object>();
if (stateSamplerInfo.state != null) {
metric.put("last-state-name", stateSamplerInfo.state);
}
if (stateSamplerInfo.transitionCount != null) {
metric.put("num-transitions", stateSamplerInfo.transitionCount);
}
if (stateSamplerInfo.stateDurationMillis != null) {
metric.put("last-state-duration-ms",
stateSamplerInfo.stateDurationMillis);
}
update.setInternal(metric);
updates.add(update);
}
status.setMetricUpdates(updates);
// TODO: Provide more structure representation of error,
// e.g., the serialized exception object.
if (errors != null) {
status.setErrors(errors);
}
if (progress != null) {
status.setProgress(readerProgressToCloudProgress(progress));
}
if (dynamicSplitResult instanceof Reader.DynamicSplitResultWithPosition) {
Reader.DynamicSplitResultWithPosition asPosition =
(Reader.DynamicSplitResultWithPosition) dynamicSplitResult;
status.setStopPosition(toCloudPosition(asPosition.getAcceptedPosition()));
} else if (dynamicSplitResult instanceof CustomSources.BoundedSourceSplit) {
status.setDynamicSourceSplit(
CustomSources.toSourceSplit(
(CustomSources.BoundedSourceSplit<?>) dynamicSplitResult, options));
} else if (dynamicSplitResult != null) {
throw new IllegalArgumentException(
"Unexpected type of dynamic split result: " + dynamicSplitResult);
}
if (workItem.getSourceOperationTask() != null) {
status.setSourceOperationResponse(operationResponse);
}
return status;
}