in src/main/java/com/spotify/google/cloud/pubsub/client/Puller.java [194:247]
private void pullBatch() {
outstandingRequests.incrementAndGet();
pubsub.pull(project, subscription, true, batchSize)
.whenComplete((messages, ex) -> {
outstandingRequests.decrementAndGet();
// Bail if pull failed
if (ex != null) {
if ( ex instanceof RequestFailedException && ((RequestFailedException)ex).statusCode() == 429 ) {
LOG.debug("Going too fast, backing off");
} else {
LOG.error("Pull failed", ex);
}
backoff.sleep();
return;
}
// we are good. Lets go at full speed again.
backoff.reset();
// Add entire batch to outstanding message count
outstandingMessages.addAndGet(messages.size());
// Call handler for each received message
for (final ReceivedMessage message : messages) {
final CompletionStage<String> handlerFuture;
try {
handlerFuture = handler.handleMessage(this, subscription, message.message(), message.ackId());
} catch (Throwable t) {
outstandingMessages.decrementAndGet();
LOG.error("Message handler threw exception", t);
continue;
}
if (handlerFuture == null) {
outstandingMessages.decrementAndGet();
LOG.error("Message handler returned null");
continue;
}
// Decrement the number of outstanding messages when handling is complete
handlerFuture.whenComplete((ignore, throwable) -> outstandingMessages.decrementAndGet());
// Ack when the message handling successfully completes
handlerFuture.thenAccept(acker::acknowledge).exceptionally(throwable -> {
if (!(throwable instanceof CancellationException)) {
LOG.error("Acking pubsub threw exception", throwable);
}
return null;
});
}
});
}