public void write()

in src/main/java/com/twitter/http2/HttpConnectionHandler.java [653:893]


    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
            throws Exception {
        if (msg instanceof HttpDataFrame) {

            HttpDataFrame httpDataFrame = (HttpDataFrame) msg;
            int streamId = httpDataFrame.getStreamId();

            // Frames must not be sent on half-closed streams
            if (httpConnection.isLocalSideClosed(streamId)) {
                promise.setFailure(PROTOCOL_EXCEPTION);
                return;
            }

            // HTTP/2 DATA frame flow control processing requirements:
            //
            // Sender must not send a data frame with data length greater
            // than the transfer window size.
            //
            // After sending each data frame, the sender decrements its
            // transfer window size by the amount of data transmitted.
            //
            // When the window size becomes less than or equal to 0, the
            // sender must pause transmitting data frames.

            int dataLength = httpDataFrame.content().readableBytes();
            int sendWindowSize = httpConnection.getSendWindowSize(streamId);
            int connectionSendWindowSize = httpConnection.getSendWindowSize(
                    HTTP_CONNECTION_STREAM_ID);
            sendWindowSize = Math.min(sendWindowSize, connectionSendWindowSize);

            if (sendWindowSize <= 0) {
                // Stream is stalled -- enqueue Data frame and return
                httpConnection.putPendingWrite(
                        streamId, new HttpConnection.PendingWrite(httpDataFrame, promise));
                return;
            } else if (sendWindowSize < dataLength) {
                // Stream is not stalled but we cannot send the entire frame
                httpConnection.updateSendWindowSize(streamId, -1 * sendWindowSize);
                httpConnection.updateSendWindowSize(HTTP_CONNECTION_STREAM_ID, -1 * sendWindowSize);

                // Create a partial data frame whose length is the current window size
                ByteBuf data = httpDataFrame.content().readSlice(sendWindowSize).retain();
                ByteBuf partialDataFrame = httpFrameEncoder.encodeDataFrame(streamId, false, data);

                // Enqueue the remaining data (will be the first frame queued)
                httpConnection.putPendingWrite(
                        streamId, new HttpConnection.PendingWrite(httpDataFrame, promise));

                ChannelPromise writeFuture = ctx.channel().newPromise();

                // The transfer window size is pre-decremented when sending a data frame downstream.
                // Close the connection on write failures that leaves the transfer window in a corrupt state.
                writeFuture.addListener(connectionErrorListener);

                ctx.write(partialDataFrame, writeFuture);
                return;
            } else {
                // Window size is large enough to send entire data frame
                httpConnection.updateSendWindowSize(streamId, -1 * dataLength);
                httpConnection.updateSendWindowSize(HTTP_CONNECTION_STREAM_ID, -1 * dataLength);

                // The transfer window size is pre-decremented when sending a data frame downstream.
                // Close the connection on write failures that leaves the transfer window in a corrupt state.
                promise.addListener(connectionErrorListener);
            }

            // Close the local side of the stream if this is the last frame
            if (httpDataFrame.isLast()) {
                halfCloseStream(streamId, false, promise);
            }

            ByteBuf frame = httpFrameEncoder.encodeDataFrame(
                    streamId,
                    httpDataFrame.isLast(),
                    httpDataFrame.content()
            );
            ctx.write(frame, promise);

        } else if (msg instanceof HttpHeadersFrame) {

            HttpHeadersFrame httpHeadersFrame = (HttpHeadersFrame) msg;
            int streamId = httpHeadersFrame.getStreamId();

            if (isRemoteInitiatedId(streamId)) {
                if (streamId <= lastStreamId) {
                    // Attempting to send headers for an older stream
                    // (older than the latest accepted remote initiated stream)
                    // Ensure that the frames are not sent on a half-closed (local) or closed streams
                    if (httpConnection.isLocalSideClosed(streamId)) {
                        promise.setFailure(PROTOCOL_EXCEPTION);
                        return;
                    }
                } else {
                    // If we are attempting to write to a remote initiated stream id which is greater than the latest
                    // accepted stream Id then we must throw a protocol exception! i.e we cannot write on a remote
                    // initiated stream which we have not accepted before
                    promise.setFailure(PROTOCOL_EXCEPTION);
                    return;
                }
            } else {
                // This is a locally initiated stream (Push)
                boolean exclusive = httpHeadersFrame.isExclusive();
                int dependency = httpHeadersFrame.getDependency();
                int weight = httpHeadersFrame.getWeight();
                if (!acceptStream(streamId, exclusive, dependency, weight)) {
                    promise.setFailure(PROTOCOL_EXCEPTION);
                    return;
                }
            }

            // Close the local side of the stream if this is the last frame
            if (httpHeadersFrame.isLast()) {
                halfCloseStream(streamId, false, promise);
            }

            synchronized (httpHeaderBlockEncoder) {
                ByteBuf frame = httpFrameEncoder.encodeHeadersFrame(
                        httpHeadersFrame.getStreamId(),
                        httpHeadersFrame.isLast(),
                        httpHeadersFrame.isExclusive(),
                        httpHeadersFrame.getDependency(),
                        httpHeadersFrame.getWeight(),
                        httpHeaderBlockEncoder.encode(ctx, httpHeadersFrame)
                );
                // Writes of compressed data must occur in order
                ctx.write(frame, promise);
            }

        } else if (msg instanceof HttpPriorityFrame) {

            HttpPriorityFrame httpPriorityFrame = (HttpPriorityFrame) msg;
            int streamId = httpPriorityFrame.getStreamId();
            boolean exclusive = httpPriorityFrame.isExclusive();
            int dependency = httpPriorityFrame.getDependency();
            int weight = httpPriorityFrame.getWeight();
            setPriority(streamId, exclusive, dependency, weight);
            ByteBuf frame = httpFrameEncoder.encodePriorityFrame(
                    streamId,
                    exclusive,
                    dependency,
                    weight
            );
            ctx.write(frame, promise);

        } else if (msg instanceof HttpRstStreamFrame) {

            HttpRstStreamFrame httpRstStreamFrame = (HttpRstStreamFrame) msg;
            removeStream(httpRstStreamFrame.getStreamId(), promise);
            ByteBuf frame = httpFrameEncoder.encodeRstStreamFrame(
                    httpRstStreamFrame.getStreamId(),
                    httpRstStreamFrame.getErrorCode().getCode());
            ctx.write(frame, promise);

        } else if (msg instanceof HttpSettingsFrame) {

            // TODO(jpinner) currently cannot have more than one settings frame outstanding at a time

            HttpSettingsFrame httpSettingsFrame = (HttpSettingsFrame) msg;
            if (httpSettingsFrame.isAck()) {
                // Cannot send an acknowledgement frame
                promise.setFailure(PROTOCOL_EXCEPTION);
                return;
            }

            int newHeaderTableSize =
                    httpSettingsFrame.getValue(HttpSettingsFrame.SETTINGS_HEADER_TABLE_SIZE);
            if (newHeaderTableSize >= 0) {
                headerTableSize = newHeaderTableSize;
                changeDecoderHeaderTableSize = true;
            }

            int newConcurrentStreams =
                    httpSettingsFrame.getValue(HttpSettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
            if (newConcurrentStreams >= 0) {
                localConcurrentStreams = newConcurrentStreams;
            }

            int newInitialWindowSize =
                    httpSettingsFrame.getValue(HttpSettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
            if (newInitialWindowSize >= 0) {
                updateInitialReceiveWindowSize(newInitialWindowSize);
            }

            ByteBuf frame = httpFrameEncoder.encodeSettingsFrame(httpSettingsFrame);
            ctx.write(frame, promise);

        } else if (msg instanceof HttpPushPromiseFrame) {

            if (!pushEnabled) {
                promise.setFailure(PROTOCOL_EXCEPTION);
                return;
            }

            synchronized (httpHeaderBlockEncoder) {
              HttpPushPromiseFrame httpPushPromiseFrame = (HttpPushPromiseFrame) msg;
              ByteBuf frame = httpFrameEncoder.encodePushPromiseFrame(
                  httpPushPromiseFrame.getStreamId(),
                  httpPushPromiseFrame.getPromisedStreamId(),
                  httpHeaderBlockEncoder.encode(ctx, httpPushPromiseFrame)
                  );
              // Writes of compressed data must occur in order
              ctx.write(frame, promise);
            }

        } else if (msg instanceof HttpPingFrame) {

            HttpPingFrame httpPingFrame = (HttpPingFrame) msg;
            if (httpPingFrame.isPong()) {
                // Cannot send a PONG frame
                promise.setFailure(PROTOCOL_EXCEPTION);
            } else {
                ByteBuf frame = httpFrameEncoder.encodePingFrame(httpPingFrame.getData(), false);
                ctx.write(frame, promise);
            }

        } else if (msg instanceof HttpGoAwayFrame) {

            // Why is this being sent? Intercept it and fail the write.
            // Should have sent a CLOSE ChannelStateEvent
            promise.setFailure(PROTOCOL_EXCEPTION);

        } else if (msg instanceof HttpWindowUpdateFrame) {

            HttpWindowUpdateFrame httpWindowUpdateFrame = (HttpWindowUpdateFrame) msg;
            int streamId = httpWindowUpdateFrame.getStreamId();

            if (handleStreamWindowUpdates || streamId == HTTP_CONNECTION_STREAM_ID) {
                // Why is this being sent? Intercept it and fail the write.
                promise.setFailure(PROTOCOL_EXCEPTION);
            } else {
                int windowSizeIncrement = httpWindowUpdateFrame.getWindowSizeIncrement();
                httpConnection.updateReceiveWindowSize(streamId, windowSizeIncrement);
                ByteBuf frame = httpFrameEncoder.encodeWindowUpdateFrame(streamId, windowSizeIncrement);
                ctx.write(frame, promise);
            }

        } else {

            ctx.write(msg, promise);
        }
    }