private PubsubFuture request()

in src/main/java/com/spotify/google/cloud/pubsub/client/Pubsub.java [752:843]


  private <T> PubsubFuture<T> request(final String operation, final HttpMethod method, final String path,
      final Object payload, final ResponseReader<T> responseReader) {

    final String uri = baseUri + path;
    final RequestBuilder builder = new RequestBuilder()
        .setUrl(uri)
        .setMethod(method.toString())
        .setHeader("Authorization", "Bearer " + accessToken)
        .setHeader("User-Agent", USER_AGENT);

    final long payloadSize;
    if (payload != NO_PAYLOAD) {
      final byte[] json = gzipJson(payload);
      payloadSize = json.length;
      builder.setHeader(CONTENT_ENCODING, GZIP);
      builder.setHeader(CONTENT_LENGTH, String.valueOf(json.length));
      builder.setHeader(CONTENT_TYPE, APPLICATION_JSON_UTF8);
      builder.setBody(json);
    } else {
      builder.setHeader(CONTENT_LENGTH, String.valueOf(0));
      payloadSize = 0;
    }

    final Request request = builder.build();

    final RequestInfo requestInfo = RequestInfo.builder()
        .operation(operation)
        .method(method.toString())
        .uri(uri)
        .payloadSize(payloadSize)
        .build();

    final PubsubFuture<T> future = new PubsubFuture<>(requestInfo);
    client.executeRequest(request, new AsyncHandler<Void>() {
      private final ByteArrayOutputStream bytes = new ByteArrayOutputStream();

      @Override
      public void onThrowable(final Throwable t) {
        future.fail(t);
      }

      @Override
      public STATE onBodyPartReceived(final HttpResponseBodyPart bodyPart) throws Exception {
        bytes.write(bodyPart.getBodyPartBytes());
        return STATE.CONTINUE;
      }

      @Override
      public STATE onStatusReceived(final HttpResponseStatus status) throws Exception {

        // Return null for 404'd GET & DELETE requests
        if (status.getStatusCode() == 404 && method == HttpMethod.GET || method == HttpMethod.DELETE) {
          future.succeed(null);
          return STATE.ABORT;
        }

        // Fail on non-2xx responses
        final int statusCode = status.getStatusCode();
        if (!(statusCode >= 200 && statusCode < 300)) {
          future.fail(new RequestFailedException(status.getStatusCode(), status.getStatusText()));
          return STATE.ABORT;
        }

        if (responseReader == VOID) {
          future.succeed(null);
          return STATE.ABORT;
        }

        return STATE.CONTINUE;
      }

      @Override
      public STATE onHeadersReceived(final HttpResponseHeaders headers) throws Exception {
        return STATE.CONTINUE;
      }

      @Override
      public Void onCompleted() throws Exception {
        if (future.isDone()) {
          return null;
        }
        try {
          future.succeed(responseReader.read(bytes.toByteArray()));
        } catch (Exception e) {
          future.fail(e);
        }
        return null;
      }
    });

    return future;
  }