in src/main/java/com/spotify/google/cloud/pubsub/client/Pubsub.java [752:843]
private <T> PubsubFuture<T> request(final String operation, final HttpMethod method, final String path,
final Object payload, final ResponseReader<T> responseReader) {
final String uri = baseUri + path;
final RequestBuilder builder = new RequestBuilder()
.setUrl(uri)
.setMethod(method.toString())
.setHeader("Authorization", "Bearer " + accessToken)
.setHeader("User-Agent", USER_AGENT);
final long payloadSize;
if (payload != NO_PAYLOAD) {
final byte[] json = gzipJson(payload);
payloadSize = json.length;
builder.setHeader(CONTENT_ENCODING, GZIP);
builder.setHeader(CONTENT_LENGTH, String.valueOf(json.length));
builder.setHeader(CONTENT_TYPE, APPLICATION_JSON_UTF8);
builder.setBody(json);
} else {
builder.setHeader(CONTENT_LENGTH, String.valueOf(0));
payloadSize = 0;
}
final Request request = builder.build();
final RequestInfo requestInfo = RequestInfo.builder()
.operation(operation)
.method(method.toString())
.uri(uri)
.payloadSize(payloadSize)
.build();
final PubsubFuture<T> future = new PubsubFuture<>(requestInfo);
client.executeRequest(request, new AsyncHandler<Void>() {
private final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
@Override
public void onThrowable(final Throwable t) {
future.fail(t);
}
@Override
public STATE onBodyPartReceived(final HttpResponseBodyPart bodyPart) throws Exception {
bytes.write(bodyPart.getBodyPartBytes());
return STATE.CONTINUE;
}
@Override
public STATE onStatusReceived(final HttpResponseStatus status) throws Exception {
// Return null for 404'd GET & DELETE requests
if (status.getStatusCode() == 404 && method == HttpMethod.GET || method == HttpMethod.DELETE) {
future.succeed(null);
return STATE.ABORT;
}
// Fail on non-2xx responses
final int statusCode = status.getStatusCode();
if (!(statusCode >= 200 && statusCode < 300)) {
future.fail(new RequestFailedException(status.getStatusCode(), status.getStatusText()));
return STATE.ABORT;
}
if (responseReader == VOID) {
future.succeed(null);
return STATE.ABORT;
}
return STATE.CONTINUE;
}
@Override
public STATE onHeadersReceived(final HttpResponseHeaders headers) throws Exception {
return STATE.CONTINUE;
}
@Override
public Void onCompleted() throws Exception {
if (future.isDone()) {
return null;
}
try {
future.succeed(responseReader.read(bytes.toByteArray()));
} catch (Exception e) {
future.fail(e);
}
return null;
}
});
return future;
}