in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/WindmillStateReader.java [262:341]
private void consumeResponse(Windmill.GetDataRequest request,
Windmill.GetDataResponse getDataResponse, Set<StateTag> toFetch) {
// Validate the response is for our computation/key.
if (getDataResponse.getDataCount() == 0) {
throw new RuntimeException(
"No computation in response to request: " + request);
} else if (getDataResponse.getDataCount() > 1) {
throw new RuntimeException("Expected exactly one computation in response, but got: "
+ getDataResponse.getDataList());
}
Windmill.ComputationGetDataResponse computationResponse = getDataResponse.getData(0);
if (!computation.equals(computationResponse.getComputationId())) {
throw new RuntimeException("Expected data for computation " + computation
+ " but was " + computationResponse.getComputationId());
}
if (computationResponse.getDataCount() == 0) {
throw new RuntimeException(
"No key in response to request: " + request);
} else if (computationResponse.getDataCount() > 1) {
throw new RuntimeException(
"Expected exactly one key in response, but was: " + computationResponse.getDataList());
}
Windmill.KeyedGetDataResponse response = computationResponse.getData(0);
bytesRead += response.getSerializedSize();
if (response.getFailed()) {
// Set up all the futures for this key to throw an exception:
StreamingDataflowWorker.KeyTokenInvalidException keyTokenInvalidException =
new StreamingDataflowWorker.KeyTokenInvalidException(key.toStringUtf8());
for (StateTag stateTag : toFetch) {
futures.get(stateTag).setException(keyTokenInvalidException);
}
return;
}
if (!key.equals(response.getKey())) {
throw new RuntimeException("Expected data for key " + key
+ " but was " + response.getKey());
}
for (Windmill.TagList list : response.getListsList()) {
StateTag stateTag = new StateTag(
StateTag.Kind.LIST, list.getTag(), list.getStateFamily());
if (!toFetch.remove(stateTag)) {
throw new IllegalStateException(
"Received response for unrequested tag " + stateTag + ". Pending tags: " + toFetch);
}
consumeTagList(list, stateTag);
}
for (Windmill.WatermarkHold hold : response.getWatermarkHoldsList()) {
StateTag stateTag = new StateTag(
StateTag.Kind.WATERMARK, hold.getTag(), hold.getStateFamily());
if (!toFetch.remove(stateTag)) {
throw new IllegalStateException(
"Received response for unrequested tag " + stateTag + ". Pending tags: " + toFetch);
}
consumeWatermark(hold, stateTag);
}
for (Windmill.TagValue value : response.getValuesList()) {
StateTag stateTag = new StateTag(
StateTag.Kind.VALUE, value.getTag(), value.getStateFamily());
if (!toFetch.remove(stateTag)) {
throw new IllegalStateException(
"Received response for unrequested tag " + stateTag + ". Pending tags: " + toFetch);
}
consumeTagValue(value, stateTag);
}
if (!toFetch.isEmpty()) {
throw new IllegalStateException(
"Didn't receive responses for all pending fetches. Missing: " + toFetch);
}
}