private int sendBatch()

in src/main/java/com/spotify/google/cloud/pubsub/client/Publisher.java [391:449]


    private int sendBatch() {
      final List<Message> batch = new ArrayList<>();
      final List<CompletableFuture<String>> futures = new ArrayList<>();

      // Drain queue up to batch size
      while (batch.size() < batchSize) {
        final QueuedMessage message = queue.poll();
        if (message == null) {
          break;
        }
        batch.add(message.message);
        futures.add(message.future);
      }

      // Was there anything to send?
      if (batch.size() == 0) {
        return 0;
      }

      // Decrement the queue size counter
      size.updateAndGet(i -> i - batch.size());

      // Send the batch request and increment the outstanding request counter
      outstanding.incrementAndGet();
      final PubsubFuture<List<String>> batchFuture = pubsub.publish(project, topic, batch);
      listener.sendingBatch(Publisher.this, topic, unmodifiableList(batch), batchFuture);
      batchFuture.whenComplete(
          (List<String> messageIds, Throwable ex) -> {

            // Decrement the outstanding request counter
            outstanding.decrementAndGet();

            // Fail all futures if the batch request failed
            if (ex != null) {
              futures.forEach(f -> f.completeExceptionally(ex));
              return;
            }

            // Verify that the number of message id's and messages match up
            if (futures.size() != messageIds.size()) {
              futures.forEach(f -> f.completeExceptionally(
                  new PubsubException(
                      "message id count mismatch: " +
                      futures.size() + " != " + messageIds.size())));
            }

            // Complete each future with the appropriate message id
            for (int i = 0; i < futures.size(); i++) {
              final String messageId = messageIds.get(i);
              final CompletableFuture<String> future = futures.get(i);
              future.complete(messageId);
            }
          })

          // When batch is complete, process pending topics.
          .whenComplete((v, t) -> sendPending());

      return batch.size();
    }