in src/main/java/com/spotify/google/cloud/pubsub/client/Acker.java [150:201]
private int sendBatch() {
final List<String> batch = new ArrayList<>();
final List<CompletableFuture<Void>> futures = new ArrayList<>();
// Drain queue up to batch size
while (batch.size() < batchSize) {
final QueuedAck ack = queue.poll();
if (ack == null) {
break;
}
batch.add(ack.ackId);
futures.add(ack.future);
}
// Was there anything to send?
if (batch.size() == 0) {
return 0;
}
// Decrement the queue size counter
size.updateAndGet(i -> i - batch.size());
// Send the batch request and increment the outstanding request counter
outstanding.incrementAndGet();
final PubsubFuture<Void> batchFuture = pubsub.acknowledge(project, subscription, batch);
batchFuture.whenComplete(
(Void ignore, Throwable ex) -> {
// Decrement the outstanding request counter
outstanding.decrementAndGet();
// Fail all futures if the batch request failed
if (ex != null) {
futures.forEach(f -> f.completeExceptionally(ex));
backoff.sleep();
return;
}
backoff.reset();
// Complete each future
for (int i = 0; i < futures.size(); i++) {
final CompletableFuture<Void> future = futures.get(i);
future.complete(null);
}
})
// When batch is complete, process pending acks.
.whenComplete((v, t) -> send());
return batch.size();
}