in src/main/java/com/spotify/google/cloud/pubsub/client/Acker.java [93:127]
public CompletableFuture<Void> acknowledge(final String ackId) {
final CompletableFuture<Void> future = new CompletableFuture<>();
// Enforce queue size limit
int currentSize;
int newSize;
do {
currentSize = size.get();
newSize = currentSize + 1;
if (newSize > queueSize) {
future.completeExceptionally(new QueueFullException());
return future;
}
} while (!size.compareAndSet(currentSize, newSize));
// Enqueue outgoing ack
queue.add(new QueuedAck(ackId, future));
// Reached the batch size? Send immediately.
if (newSize >= batchSize) {
send();
return future;
}
// Schedule later acking, allowing more acks to gather into a larger batch.
if (scheduled.compareAndSet(false, true)) {
try {
scheduler.schedule(this::scheduledSend, maxLatencyMs, MILLISECONDS);
} catch (RejectedExecutionException ignore) {
// Race with a call to close(). Ignore.
}
}
return future;
}