private void consumeResponse()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/WindmillStateReader.java [262:341]


  private void consumeResponse(Windmill.GetDataRequest request,
      Windmill.GetDataResponse getDataResponse, Set<StateTag> toFetch) {
    // Validate the response is for our computation/key.
    if (getDataResponse.getDataCount() == 0) {
      throw new RuntimeException(
          "No computation in response to request: " + request);
    } else if (getDataResponse.getDataCount() > 1) {
      throw new RuntimeException("Expected exactly one computation in response, but got: "
          + getDataResponse.getDataList());
    }

    Windmill.ComputationGetDataResponse computationResponse = getDataResponse.getData(0);

    if (!computation.equals(computationResponse.getComputationId())) {
      throw new RuntimeException("Expected data for computation " + computation
          + " but was " + computationResponse.getComputationId());
    }

    if (computationResponse.getDataCount() == 0) {
      throw new RuntimeException(
          "No key in response to request: " + request);
    } else if (computationResponse.getDataCount() > 1) {
      throw new RuntimeException(
          "Expected exactly one key in response, but was: " + computationResponse.getDataList());
    }

    Windmill.KeyedGetDataResponse response = computationResponse.getData(0);
    bytesRead += response.getSerializedSize();

    if (response.getFailed()) {
      // Set up all the futures for this key to throw an exception:
      StreamingDataflowWorker.KeyTokenInvalidException keyTokenInvalidException =
          new StreamingDataflowWorker.KeyTokenInvalidException(key.toStringUtf8());
      for (StateTag stateTag : toFetch) {
        futures.get(stateTag).setException(keyTokenInvalidException);
      }
      return;
    }

    if (!key.equals(response.getKey())) {
      throw new RuntimeException("Expected data for key " + key
          + " but was " + response.getKey());
    }


    for (Windmill.TagList list : response.getListsList()) {
      StateTag stateTag = new StateTag(
          StateTag.Kind.LIST, list.getTag(), list.getStateFamily());
      if (!toFetch.remove(stateTag)) {
        throw new IllegalStateException(
            "Received response for unrequested tag " + stateTag + ". Pending tags: " + toFetch);
      }
      consumeTagList(list, stateTag);
    }

    for (Windmill.WatermarkHold hold : response.getWatermarkHoldsList()) {
      StateTag stateTag = new StateTag(
          StateTag.Kind.WATERMARK, hold.getTag(), hold.getStateFamily());
      if (!toFetch.remove(stateTag)) {
        throw new IllegalStateException(
            "Received response for unrequested tag " + stateTag + ". Pending tags: " + toFetch);
      }
      consumeWatermark(hold, stateTag);
    }

    for (Windmill.TagValue value : response.getValuesList()) {
      StateTag stateTag = new StateTag(
          StateTag.Kind.VALUE, value.getTag(), value.getStateFamily());
      if (!toFetch.remove(stateTag)) {
        throw new IllegalStateException(
            "Received response for unrequested tag " + stateTag + ". Pending tags: " + toFetch);
      }
      consumeTagValue(value, stateTag);
    }

    if (!toFetch.isEmpty()) {
      throw new IllegalStateException(
          "Didn't receive responses for all pending fetches. Missing: " + toFetch);
    }
  }