in src/main/java/com/twitter/http2/HttpStreamEncoder.java [148:194]
public void operationComplete(final Future<HttpContent> future) throws Exception {
final FutureListener<HttpContent> chunkListener = this;
ctx.executor().execute(new Runnable() {
@Override
public void run() {
if (future.isSuccess()) {
HttpContent content = future.getNow();
ChannelPromise writeFuture;
if (content instanceof LastHttpContent) {
writeFuture = completionFuture;
} else {
writeFuture = ctx.channel().newPromise();
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
pipe.receive().addListener(chunkListener);
} else if (future.isCancelled()) {
pipe.close();
completionFuture.cancel(true);
} else {
pipe.close();
completionFuture.setFailure(future.cause());
}
}
});
}
writeChunk(ctx, writeFuture, streamId, content);
} else {
// Somebody closed the pipe
// Send a reset frame to the channel and complete the completion future
ctx.writeAndFlush(
new DefaultHttpRstStreamFrame(streamId, HttpErrorCode.INTERNAL_ERROR));
if (future.isCancelled()) {
completionFuture.cancel(true);
} else {
completionFuture.setFailure(future.cause());
}
}
}
});
}