private int sendBatch()

in src/main/java/com/spotify/google/cloud/pubsub/client/Acker.java [150:201]


  private int sendBatch() {
    final List<String> batch = new ArrayList<>();
    final List<CompletableFuture<Void>> futures = new ArrayList<>();

    // Drain queue up to batch size
    while (batch.size() < batchSize) {
      final QueuedAck ack = queue.poll();
      if (ack == null) {
        break;
      }
      batch.add(ack.ackId);
      futures.add(ack.future);
    }

    // Was there anything to send?
    if (batch.size() == 0) {
      return 0;
    }

    // Decrement the queue size counter
    size.updateAndGet(i -> i - batch.size());

    // Send the batch request and increment the outstanding request counter
    outstanding.incrementAndGet();
    final PubsubFuture<Void> batchFuture = pubsub.acknowledge(project, subscription, batch);
    batchFuture.whenComplete(
        (Void ignore, Throwable ex) -> {

          // Decrement the outstanding request counter
          outstanding.decrementAndGet();

          // Fail all futures if the batch request failed
          if (ex != null) {
            futures.forEach(f -> f.completeExceptionally(ex));
            backoff.sleep();
            return;
          }

          backoff.reset();

          // Complete each future
          for (int i = 0; i < futures.size(); i++) {
            final CompletableFuture<Void> future = futures.get(i);
            future.complete(null);
          }
        })

        // When batch is complete, process pending acks.
        .whenComplete((v, t) -> send());

    return batch.size();
  }