in src/main/java/com/spotify/google/cloud/pubsub/client/Publisher.java [391:449]
private int sendBatch() {
final List<Message> batch = new ArrayList<>();
final List<CompletableFuture<String>> futures = new ArrayList<>();
// Drain queue up to batch size
while (batch.size() < batchSize) {
final QueuedMessage message = queue.poll();
if (message == null) {
break;
}
batch.add(message.message);
futures.add(message.future);
}
// Was there anything to send?
if (batch.size() == 0) {
return 0;
}
// Decrement the queue size counter
size.updateAndGet(i -> i - batch.size());
// Send the batch request and increment the outstanding request counter
outstanding.incrementAndGet();
final PubsubFuture<List<String>> batchFuture = pubsub.publish(project, topic, batch);
listener.sendingBatch(Publisher.this, topic, unmodifiableList(batch), batchFuture);
batchFuture.whenComplete(
(List<String> messageIds, Throwable ex) -> {
// Decrement the outstanding request counter
outstanding.decrementAndGet();
// Fail all futures if the batch request failed
if (ex != null) {
futures.forEach(f -> f.completeExceptionally(ex));
return;
}
// Verify that the number of message id's and messages match up
if (futures.size() != messageIds.size()) {
futures.forEach(f -> f.completeExceptionally(
new PubsubException(
"message id count mismatch: " +
futures.size() + " != " + messageIds.size())));
}
// Complete each future with the appropriate message id
for (int i = 0; i < futures.size(); i++) {
final String messageId = messageIds.get(i);
final CompletableFuture<String> future = futures.get(i);
future.complete(messageId);
}
})
// When batch is complete, process pending topics.
.whenComplete((v, t) -> sendPending());
return batch.size();
}