private void pullBatch()

in src/main/java/com/spotify/google/cloud/pubsub/client/Puller.java [194:247]


  private void pullBatch() {
    outstandingRequests.incrementAndGet();

    pubsub.pull(project, subscription, true, batchSize)
        .whenComplete((messages, ex) -> {

          outstandingRequests.decrementAndGet();
          // Bail if pull failed
          if (ex != null) {
            if ( ex instanceof RequestFailedException && ((RequestFailedException)ex).statusCode() == 429 ) {
              LOG.debug("Going too fast, backing off");
            } else {
              LOG.error("Pull failed", ex);
            }
            backoff.sleep();
            return;
          }

          // we are good. Lets go at full speed again.
          backoff.reset();

          // Add entire batch to outstanding message count
          outstandingMessages.addAndGet(messages.size());

          // Call handler for each received message
          for (final ReceivedMessage message : messages) {
            final CompletionStage<String> handlerFuture;
            try {
              handlerFuture = handler.handleMessage(this, subscription, message.message(), message.ackId());
            } catch (Throwable t) {
              outstandingMessages.decrementAndGet();
              LOG.error("Message handler threw exception", t);
              continue;
            }

            if (handlerFuture == null) {
              outstandingMessages.decrementAndGet();
              LOG.error("Message handler returned null");
              continue;
            }

            // Decrement the number of outstanding messages when handling is complete
            handlerFuture.whenComplete((ignore, throwable) -> outstandingMessages.decrementAndGet());

            // Ack when the message handling successfully completes
            handlerFuture.thenAccept(acker::acknowledge).exceptionally(throwable -> {
              if (!(throwable instanceof CancellationException)) {
                LOG.error("Acking pubsub threw exception", throwable);
              }
              return null;
            });
          }
        });
  }