in src/main/java/com/twitter/whiskey/futures/ReactiveFuture.java [138:164]
public void addObserver(final Observer<E> observer) {
synchronized (this) {
observers.add(observer);
if (streaming && !drained) {
drained = true;
for (final E element : drain()) {
observer.getExecutor().execute(new Runnable() {
@Override
public void run() {
observer.onNext(element);
}
});
}
}
if (isDone()) {
observer.getExecutor().execute(new Runnable() {
@Override
public void run() {
observer.onComplete();
}
});
}
}
}