in src/main/java/com/twitter/http2/HttpConnectionHandler.java [653:893]
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof HttpDataFrame) {
HttpDataFrame httpDataFrame = (HttpDataFrame) msg;
int streamId = httpDataFrame.getStreamId();
// Frames must not be sent on half-closed streams
if (httpConnection.isLocalSideClosed(streamId)) {
promise.setFailure(PROTOCOL_EXCEPTION);
return;
}
// HTTP/2 DATA frame flow control processing requirements:
//
// Sender must not send a data frame with data length greater
// than the transfer window size.
//
// After sending each data frame, the sender decrements its
// transfer window size by the amount of data transmitted.
//
// When the window size becomes less than or equal to 0, the
// sender must pause transmitting data frames.
int dataLength = httpDataFrame.content().readableBytes();
int sendWindowSize = httpConnection.getSendWindowSize(streamId);
int connectionSendWindowSize = httpConnection.getSendWindowSize(
HTTP_CONNECTION_STREAM_ID);
sendWindowSize = Math.min(sendWindowSize, connectionSendWindowSize);
if (sendWindowSize <= 0) {
// Stream is stalled -- enqueue Data frame and return
httpConnection.putPendingWrite(
streamId, new HttpConnection.PendingWrite(httpDataFrame, promise));
return;
} else if (sendWindowSize < dataLength) {
// Stream is not stalled but we cannot send the entire frame
httpConnection.updateSendWindowSize(streamId, -1 * sendWindowSize);
httpConnection.updateSendWindowSize(HTTP_CONNECTION_STREAM_ID, -1 * sendWindowSize);
// Create a partial data frame whose length is the current window size
ByteBuf data = httpDataFrame.content().readSlice(sendWindowSize).retain();
ByteBuf partialDataFrame = httpFrameEncoder.encodeDataFrame(streamId, false, data);
// Enqueue the remaining data (will be the first frame queued)
httpConnection.putPendingWrite(
streamId, new HttpConnection.PendingWrite(httpDataFrame, promise));
ChannelPromise writeFuture = ctx.channel().newPromise();
// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the connection on write failures that leaves the transfer window in a corrupt state.
writeFuture.addListener(connectionErrorListener);
ctx.write(partialDataFrame, writeFuture);
return;
} else {
// Window size is large enough to send entire data frame
httpConnection.updateSendWindowSize(streamId, -1 * dataLength);
httpConnection.updateSendWindowSize(HTTP_CONNECTION_STREAM_ID, -1 * dataLength);
// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the connection on write failures that leaves the transfer window in a corrupt state.
promise.addListener(connectionErrorListener);
}
// Close the local side of the stream if this is the last frame
if (httpDataFrame.isLast()) {
halfCloseStream(streamId, false, promise);
}
ByteBuf frame = httpFrameEncoder.encodeDataFrame(
streamId,
httpDataFrame.isLast(),
httpDataFrame.content()
);
ctx.write(frame, promise);
} else if (msg instanceof HttpHeadersFrame) {
HttpHeadersFrame httpHeadersFrame = (HttpHeadersFrame) msg;
int streamId = httpHeadersFrame.getStreamId();
if (isRemoteInitiatedId(streamId)) {
if (streamId <= lastStreamId) {
// Attempting to send headers for an older stream
// (older than the latest accepted remote initiated stream)
// Ensure that the frames are not sent on a half-closed (local) or closed streams
if (httpConnection.isLocalSideClosed(streamId)) {
promise.setFailure(PROTOCOL_EXCEPTION);
return;
}
} else {
// If we are attempting to write to a remote initiated stream id which is greater than the latest
// accepted stream Id then we must throw a protocol exception! i.e we cannot write on a remote
// initiated stream which we have not accepted before
promise.setFailure(PROTOCOL_EXCEPTION);
return;
}
} else {
// This is a locally initiated stream (Push)
boolean exclusive = httpHeadersFrame.isExclusive();
int dependency = httpHeadersFrame.getDependency();
int weight = httpHeadersFrame.getWeight();
if (!acceptStream(streamId, exclusive, dependency, weight)) {
promise.setFailure(PROTOCOL_EXCEPTION);
return;
}
}
// Close the local side of the stream if this is the last frame
if (httpHeadersFrame.isLast()) {
halfCloseStream(streamId, false, promise);
}
synchronized (httpHeaderBlockEncoder) {
ByteBuf frame = httpFrameEncoder.encodeHeadersFrame(
httpHeadersFrame.getStreamId(),
httpHeadersFrame.isLast(),
httpHeadersFrame.isExclusive(),
httpHeadersFrame.getDependency(),
httpHeadersFrame.getWeight(),
httpHeaderBlockEncoder.encode(ctx, httpHeadersFrame)
);
// Writes of compressed data must occur in order
ctx.write(frame, promise);
}
} else if (msg instanceof HttpPriorityFrame) {
HttpPriorityFrame httpPriorityFrame = (HttpPriorityFrame) msg;
int streamId = httpPriorityFrame.getStreamId();
boolean exclusive = httpPriorityFrame.isExclusive();
int dependency = httpPriorityFrame.getDependency();
int weight = httpPriorityFrame.getWeight();
setPriority(streamId, exclusive, dependency, weight);
ByteBuf frame = httpFrameEncoder.encodePriorityFrame(
streamId,
exclusive,
dependency,
weight
);
ctx.write(frame, promise);
} else if (msg instanceof HttpRstStreamFrame) {
HttpRstStreamFrame httpRstStreamFrame = (HttpRstStreamFrame) msg;
removeStream(httpRstStreamFrame.getStreamId(), promise);
ByteBuf frame = httpFrameEncoder.encodeRstStreamFrame(
httpRstStreamFrame.getStreamId(),
httpRstStreamFrame.getErrorCode().getCode());
ctx.write(frame, promise);
} else if (msg instanceof HttpSettingsFrame) {
// TODO(jpinner) currently cannot have more than one settings frame outstanding at a time
HttpSettingsFrame httpSettingsFrame = (HttpSettingsFrame) msg;
if (httpSettingsFrame.isAck()) {
// Cannot send an acknowledgement frame
promise.setFailure(PROTOCOL_EXCEPTION);
return;
}
int newHeaderTableSize =
httpSettingsFrame.getValue(HttpSettingsFrame.SETTINGS_HEADER_TABLE_SIZE);
if (newHeaderTableSize >= 0) {
headerTableSize = newHeaderTableSize;
changeDecoderHeaderTableSize = true;
}
int newConcurrentStreams =
httpSettingsFrame.getValue(HttpSettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
if (newConcurrentStreams >= 0) {
localConcurrentStreams = newConcurrentStreams;
}
int newInitialWindowSize =
httpSettingsFrame.getValue(HttpSettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
if (newInitialWindowSize >= 0) {
updateInitialReceiveWindowSize(newInitialWindowSize);
}
ByteBuf frame = httpFrameEncoder.encodeSettingsFrame(httpSettingsFrame);
ctx.write(frame, promise);
} else if (msg instanceof HttpPushPromiseFrame) {
if (!pushEnabled) {
promise.setFailure(PROTOCOL_EXCEPTION);
return;
}
synchronized (httpHeaderBlockEncoder) {
HttpPushPromiseFrame httpPushPromiseFrame = (HttpPushPromiseFrame) msg;
ByteBuf frame = httpFrameEncoder.encodePushPromiseFrame(
httpPushPromiseFrame.getStreamId(),
httpPushPromiseFrame.getPromisedStreamId(),
httpHeaderBlockEncoder.encode(ctx, httpPushPromiseFrame)
);
// Writes of compressed data must occur in order
ctx.write(frame, promise);
}
} else if (msg instanceof HttpPingFrame) {
HttpPingFrame httpPingFrame = (HttpPingFrame) msg;
if (httpPingFrame.isPong()) {
// Cannot send a PONG frame
promise.setFailure(PROTOCOL_EXCEPTION);
} else {
ByteBuf frame = httpFrameEncoder.encodePingFrame(httpPingFrame.getData(), false);
ctx.write(frame, promise);
}
} else if (msg instanceof HttpGoAwayFrame) {
// Why is this being sent? Intercept it and fail the write.
// Should have sent a CLOSE ChannelStateEvent
promise.setFailure(PROTOCOL_EXCEPTION);
} else if (msg instanceof HttpWindowUpdateFrame) {
HttpWindowUpdateFrame httpWindowUpdateFrame = (HttpWindowUpdateFrame) msg;
int streamId = httpWindowUpdateFrame.getStreamId();
if (handleStreamWindowUpdates || streamId == HTTP_CONNECTION_STREAM_ID) {
// Why is this being sent? Intercept it and fail the write.
promise.setFailure(PROTOCOL_EXCEPTION);
} else {
int windowSizeIncrement = httpWindowUpdateFrame.getWindowSizeIncrement();
httpConnection.updateReceiveWindowSize(streamId, windowSizeIncrement);
ByteBuf frame = httpFrameEncoder.encodeWindowUpdateFrame(streamId, windowSizeIncrement);
ctx.write(frame, promise);
}
} else {
ctx.write(msg, promise);
}
}