netty http2設計深刻講解

前言

近期接到一個任務,把netty http2引入到項目裏面。據說過http2,還真沒有本身玩過。對看過這篇博客的你們說句: 抱歉。原本想很好的說說http2的。寫着寫着,發現要寫的東西太多了,有一些內容根本就很差寫。可是netty http2.0的主要內容,本章博客已經全面的講述了,須要讀者有使用經歷,閱讀點源碼。html

瞭解下http2.0

時代在發展,使用http協議的人愈來愈多。http1.1的弊端慢慢都被顯現出來。java

  1. 瀏覽器方式一些網站頻繁發送請求,形成一家獨大其餘網站沒法使用。或者全部網站都頻發發送請求形成用戶體驗差等等問題。限制每一個url同時併發數量
  2. 提升請求的響應速度。只有一個鏈接,只有一次tcp三次握手或者tls的7次握手。一個http1.1請求所用的時間,http2.0能夠處理三到四個請求。
  3. 提升服務端與客服端的性能(尤爲是大型互聯網公司流量很大,若是使用http2.0,能夠減小一半的http服務器)

協商

緣由 一

http客服端不知道http服務端是否支持http2.0。反過來 http服務端也不知道http客服端是否支持http2.0。爲何出現這種現象,讓全部的http服務端與http客服端直接從http1.1過分到http2.0是不可能的事情。甚至在大點的公司內部直接從http1.1直接過分到http2.0也是一件不現實的事情,那麼出現一件麻煩的事情有http1客服端,也有http2客服端。有http2服務端,也有http1服務端。這種兩個維度,四種狀況的共存現象。web

緣由二

有人會問,只支持http1.1很差嗎? 已經支持http2,.0的client確定不會放棄http2.0優秀的性能與特性,能使用使用http2.0,就要使用。算法

解決

那麼http2.0的設計者爲了解決這種麻煩的東西。推出瞭解決方案:協商。promise

https 1.1 與https.20的協商

https1.1與https2.0的協商是基於ALPN機制。ALPNS是基於TLS實現。在創建TLS連接的時候,客服端會 在TLS協議裏面加入本身支持的協議,服務端在客服端支持的協議裏面選中一個本身最合適的。而後把選中的協議通知給客服端。若是客戶端沒有發送支持的http協議,服務端會默認使用http1.1瀏覽器

http1.1與http2.0的協商

http沒有TLS協議,沒法基於TLS傳遞協議。協議制定者使用了Upgrade機制。客戶端發送一個空請求,請求裏面包含該Upgrade,Connection,HTTP2-Settings請求頭。服務端從Upgrade取出支持的協議而後響應請求,在響應的請求頭裏麪包含Upgrade,Connection。這樣協商就成功了。下面是http1.1與http2.0的協商流程服務器

請求頭示例:架構

GET / HTTP/1.1
Host: example.com
Connection: Upgrade, HTTP2-Settings
Upgrade: h2c
HTTP2-Settings: <base64url encoding of HTTP/2 SETTINGS payload>

若是服務端不支持 HTTP/2,它會忽略 Upgrade 字段,直接返回 HTTP/1.1 響應,例如:併發

HTTP/1.1 200 OK
Content-Length: 243
Content-Type: text/html

若是服務端支持 HTTP/2,那就能夠迴應 101 狀態碼及對應頭部:app

HTTP/1.1 100 Switching Protocols
Connection: Upgrade
Upgrade: h2c
小結
  1. https 1.1 與https.2.0的協商 與 http1.1與http2.0的協商 是兩套設計方案。https 1.1 與https.2.0 TLS幫你作了。http1.1與http2.0的協商須要本身作。
  2. 如今的趨勢,客服端與服務端都須要同時支持http1.1,http2.0,https1.1,https2.0。真是一件很麻煩的事情

netty http2

netty 的http2模塊設計的很是好,實在是很繞,就一個http2的包。實在有點亂。按照功能劃分的話http2應該有四個模塊。

  1. http2核心模塊
  2. http1.1與http2協商模塊
  3. http2幀處理模塊
  4. http2協議轉http1協議模塊

http2核心模塊

做爲核心的模塊主要是負責http2協議的解碼與編碼,幀的解析與處理,http2請求頭子模塊,streamId的管理,http2連接管理

Http2ConnectionHandler

Http2ConnectionHandler 是 netty核心設計hadler的實現,也是http2模塊的出發點。

負責http2模塊的組合
protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
                                     Http2Settings initialSettings) {
        this.initialSettings = checkNotNull(initialSettings, "initialSettings");
        this.decoder = checkNotNull(decoder, "decoder");
        this.encoder = checkNotNull(encoder, "encoder");
        if (encoder.connection() != decoder.connection()) {
            throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
        }
    }
負責http2協議下handler生命週期處理
@Override
 public void flush(ChannelHandlerContext ctx) {
	 try {
		 // Trigger pending writes in the remote flow controller.
		 encoder.flowController().writePendingBytes();
		 ctx.flush();
	 } catch (Http2Exception e) {
		 onError(ctx, true, e);
	 } catch (Throwable cause) {
	 	onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
	 }
 }

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // Initialize the encoder, decoder, flow controllers, and internal state.
        encoder.lifecycleManager(this);
        decoder.lifecycleManager(this);
        encoder.flowController().channelHandlerContext(ctx);
        decoder.flowController().channelHandlerContext(ctx);
        byteDecoder = new PrefaceDecoder(ctx);
    }

    @Override
    protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
        if (byteDecoder != null) {
            byteDecoder.handlerRemoved(ctx);
            byteDecoder = null;
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (byteDecoder == null) {
            byteDecoder = new PrefaceDecoder(ctx);//當連接建立的時候建立PrefaceDecoder對象
        }
        byteDecoder.channelActive(ctx);
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // Call super class first, as this may result in decode being called.
        super.channelInactive(ctx);
        if (byteDecoder != null) {
            byteDecoder.channelInactive(ctx);
            byteDecoder = null;
        }
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        // Writability is expected to change while we are writing. We cannot allow this event to trigger reentering
        // the allocation and write loop. Reentering the event loop will lead to over or illegal allocation.
        try {
            if (ctx.channel().isWritable()) {
                flush(ctx);
            }
            encoder.flowController().channelWritabilityChanged();
        } finally {
            super.channelWritabilityChanged(ctx);
        }
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        byteDecoder.decode(ctx, in, out);
    }
負責http2 Lifecycle Manager(http2生命週期的管理)
public interface Http2LifecycleManager {

    void closeStreamLocal(Http2Stream stream, ChannelFuture future);

    void closeStreamRemote(Http2Stream stream, ChannelFuture future);

    void closeStream(Http2Stream stream, ChannelFuture future);

    ChannelFuture resetStream(ChannelHandlerContext ctx, int streamId, long errorCode,
            ChannelPromise promise);

    ChannelFuture goAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
            ByteBuf debugData, ChannelPromise promise);

closeStreamLocal

關閉本地stream。local stream是指客服端發送headers幀與data幀到服務端,客服端會建立一個local stream。一樣服務端發送headers幀與data幀給客服端,服務端也會建立一個 local stream

closeStreamRemote

關閉遠程stream。 remote stream是值當客服端接受服務端發的headers幀與data幀 ,客服端會建立一個remote stream。一樣服務端接受到客服端發送的headers幀與data幀,服務端也會建立一個 remote stream

closeStream

當接受到 resetStream 幀的時候就用調用改方法。發送方發送一個錯誤的流,想後悔的時候,就發送resetStream幀這個後悔藥

resetStream

對resetStream幀進行處理

負責校驗協議行爲
public void onHttpClientUpgrade() throws Http2Exception {
        if (connection().isServer()) {
            throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
        }
        if (!prefaceSent()) {
            // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
            // calling this method.
            throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
        }
        if (decoder.prefaceReceived()) {
            throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
        }

        // Create a local stream used for the HTTP cleartext upgrade.
        connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
    }

    /**
     * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
     * @param settings the settings for the remote endpoint.
     */
    public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
        if (!connection().isServer()) {
            throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
        }
        if (!prefaceSent()) {
            // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
            // calling this method.
            throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
        }
        if (decoder.prefaceReceived()) {
            throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
        }

        // Apply the settings but no ACK is necessary.
        encoder.remoteSettings(settings);

        // Create a stream in the half-closed state.
        connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
    }
管理http2協議中的鏈接前言

鏈接前言????? 很術語化。其實就是協商成功以後,客戶端能夠開始發送各類 HTTP/2 幀,但第一個幀必須是 Magic 幀(內容固定爲 PRI * HTTP/2.0rnrnSMrnrn),作爲協議升級的最終確認。鏈接前言模塊由父類BaseDecoder與子類PrefaceDecoder,FrameDecoder組成。鏈接前言由PrefaceDecoder管理。

客服端發送前言

@Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // The channel just became active - send the connection preface to the remote endpoint.
            sendPreface(ctx);
        }
		private void sendPreface(ChannelHandlerContext ctx) throws Exception {
            if (prefaceSent || !ctx.channel().isActive()) {
                return;
            }

            prefaceSent = true;

            final boolean isClient = !connection().isServer();
            if (isClient) {
                // Clients must send the preface string as the first bytes on the connection.
                ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }

            // Both client and server must send their initial settings.
            encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
                    ChannelFutureListener.CLOSE_ON_FAILURE);

            if (isClient) {
                // If this handler is extended by the user and we directly fire the userEvent from this context then
                // the user will not see the event. We should fire the event starting with this handler so this class
                // (and extending classes) have a chance to process the event.
                userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
            }
        }

服務端校驗鏈接前言

@Override
        public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            try {
                if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
                    // After the preface is read, it is time to hand over control to the post initialized decoder.
                    byteDecoder = new FrameDecoder();
                    byteDecoder.decode(ctx, in, out);
                }
            } catch (Throwable e) {
                onError(ctx, false, e);
            }
        }
		        private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
            if (clientPrefaceString == null) {
                return true;
            }

            int prefaceRemaining = clientPrefaceString.readableBytes();
            int bytesRead = min(in.readableBytes(), prefaceRemaining);

            // If the input so far doesn't match the preface, break the connection.
            if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
                                                      clientPrefaceString, clientPrefaceString.readerIndex(),
                                                      bytesRead)) {
                int maxSearch = 1024; // picked because 512 is too little, and 2048 too much
                int http1Index =
                    ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
                if (http1Index != -1) {
                    String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
                    throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
                }
                String receivedBytes = hexDump(in, in.readerIndex(),
                                               min(in.readableBytes(), clientPrefaceString.readableBytes()));
                throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
                                                      "Hex dump for received bytes: %s", receivedBytes);
            }
            in.skipBytes(bytesRead);
            clientPrefaceString.skipBytes(bytesRead);

            if (!clientPrefaceString.isReadable()) {
                // Entire preface has been read.
                clientPrefaceString.release();
                clientPrefaceString = null;
                return true;
            }
            return false;
        }
        private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
            if (in.readableBytes() < 5) {
                // Need more data before we can see the frame type for the first frame.
                return false;
            }

            short frameType = in.getUnsignedByte(in.readerIndex() + 3);
            short flags = in.getUnsignedByte(in.readerIndex() + 4);
            if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
                throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
                                                      "Hex dump for first 5 bytes: %s",
                                      hexDump(in, in.readerIndex(), 5));
            }
            return true;
        }

麻煩的設計,只是爲解耦而已。代碼層次明顯。這就是架構......

http2連接管理 , streamId的管理,流量管理,權重管理

http2的編碼與解碼

解碼實現類爲DefaultHttp2ConnectionEncoder,看下很容易理解的。DefaultHttp2ConnectionEncoder內部有兩個成員變量lifecycleManager(http2生命週期)與Http2FrameWriter(協議生成實現類)。

goAway與resetStream幀的處理方式,是由lifecycleManager(http2生命週期)

@Override
    public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
            ChannelPromise promise) {
        return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise);
    }
	 @Override
    public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
            ChannelPromise promise) {
        // Delegate to the lifecycle manager for proper updating of connection state.
        return lifecycleManager.resetStream(ctx, streamId, errorCode, promise);
    }

重點:FlowControlledHeaders與FlowControlledData主要是用於流量控制與權重使用,不屬於編碼範疇

解碼流程爲 Handler(Http2ConnectionHandle) --> Decoder(PrefaceDecoder) --> Decoder(FrameDecoder) --> decoder(Http2ConnectionDecoder) --> frameReader(Http2FrameReader) --> FrameListener(Http2FrameListener) 。按照此調用鏈慢慢看容易看懂了。

  1. Handler(Http2ConnectionHandle) 負責接受到buf數據
  2. PrefaceDecoder負責校驗鏈接前言
  3. FrameDecoder 負責執行Http2ConnectionDecoder。Http2ConnectionDecoder實現類爲 DefaultHttp2ConnectionDecoder
  4. frameReader 負責解析協議。 Http2FrameReader 實現類爲DefaultHttp2FrameReader
  5. FrameListener負責對幀進行處理。FrameListener的實現類有不少,主要分FrameReadListener與其餘FrameListener。FrameReadListener負責幀進行http2特性維護,維護成功調用其餘FrameListener。由其餘FrameListener進行幀處理。

幀的解析與處理

frameReader 負責解析協議。 Http2FrameReader 實現類爲DefaultHttp2FrameReader

@Override
public void readFrame(ChannelHandlerContext ctx, ByteBuf input, Http2FrameListener listener)
        throws Http2Exception {
    if (readError) {
        input.skipBytes(input.readableBytes());
        return;
    }
    try {
        do {
            if (readingHeaders) {
                processHeaderState(input);// 解析幀
                if (readingHeaders) {
                    // Wait until the entire header has arrived.
                    return;
                }
            }
            processPayloadState(ctx, input, listener);// 處理幀
            if (!readingHeaders) {
                // Wait until the entire payload has arrived.
                return;
            }
        } while (input.isReadable());
    } catch (Http2Exception e) {
        readError = !Http2Exception.isStreamError(e);
        throw e;
    } catch (RuntimeException e) {
        readError = true;
        throw e;
    } catch (Throwable cause) {
        readError = true;
        PlatformDependent.throwException(cause);
    }
}

private void processHeaderState(ByteBuf in) throws Http2Exception {
    if (in.readableBytes() < FRAME_HEADER_LENGTH) {
        // Wait until the entire frame header has been read.
        return;
    }

    // Read the header and prepare the unmarshaller to read the frame.
    payloadLength = in.readUnsignedMedium();// 幀長度
    if (payloadLength > maxFrameSize) {
        throw connectionError(FRAME_SIZE_ERROR, "Frame length: %d exceeds maximum: %d", payloadLength,
                              maxFrameSize);
    }
    frameType = in.readByte();// 幀類型
    flags = new Http2Flags(in.readUnsignedByte());
    streamId = readUnsignedInt(in);// 獲得streamId 幀頭的9個字節處理完畢

    // We have consumed the data, next time we read we will be expecting to read the frame payload.
    readingHeaders = false;
 
    switch (frameType) {//進行校驗
        case DATA:
            verifyDataFrame();
            break;
        case HEADERS:
            verifyHeadersFrame();
            break;
        case PRIORITY:
            verifyPriorityFrame();
            break;
        case RST_STREAM:
            verifyRstStreamFrame();
            break;
        case SETTINGS:
            verifySettingsFrame();
            break;
        case PUSH_PROMISE:
            verifyPushPromiseFrame();
            break;
        case PING:
            verifyPingFrame();
            break;
        case GO_AWAY:
            verifyGoAwayFrame();
            break;
        case WINDOW_UPDATE:
            verifyWindowUpdateFrame();
            break;
        case CONTINUATION:
            verifyContinuationFrame();
            break;
        default:
            // Unknown frame type, could be an extension.
            verifyUnknownFrame();
            break;
    }
}

private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener)
                throws Http2Exception {
    if (in.readableBytes() < payloadLength) {
        // Wait until the entire payload has been read.
        return;
    }

    // Get a view of the buffer for the size of the payload.
    ByteBuf payload = in.readSlice(payloadLength);

    // We have consumed the data, next time we read we will be expecting to read a frame header.
    readingHeaders = true;

    // Read the payload and fire the frame event to the listener.
    switch (frameType) {
        case DATA:
            readDataFrame(ctx, payload, listener);
            break;
        case HEADERS:
            readHeadersFrame(ctx, payload, listener);
            break;
        case PRIORITY:
            readPriorityFrame(ctx, payload, listener);
            break;
        case RST_STREAM:
            readRstStreamFrame(ctx, payload, listener);
            break;
        case SETTINGS:
            readSettingsFrame(ctx, payload, listener);
            break;
        case PUSH_PROMISE:
            readPushPromiseFrame(ctx, payload, listener);
            break;
        case PING:
            readPingFrame(ctx, payload.readLong(), listener);
            break;
        case GO_AWAY:
            readGoAwayFrame(ctx, payload, listener);
            break;
        case WINDOW_UPDATE:
            readWindowUpdateFrame(ctx, payload, listener);
            break;
        case CONTINUATION:
            readContinuationFrame(payload, listener);
            break;
        default:
            readUnknownFrame(ctx, payload, listener);
            break;
    }
}
http2請求頭子模塊

在http2的設計中請求頭是一個重點。http2要求請求使用HPACK算法進行壓縮。Http2Headers,Http2HeadersDecoder,Http2HeadersEncoder,HPACKDecoder,HPACKEncoder組成。

http1.1與http2協商模塊

http2.0只須要一個Http2ConnectionHandler的實現類。目前netty自帶的實現類有Http2FrameCodec,HttpToHttp2ConnectionHandler。

http1.1須要HttpServerCodec與HttpObjectAggregator對象。

https 須要一個SSLHandler

private void configureSsl(SocketChannel ch) {
        ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()), new Http2OrHttpHandler());
  }
https 1.1 與 https 2.0的協商

客戶端在TSL 4次握手的時候已經把客戶端支持的http協議傳給服務端了,當4次握手成功,SSLHandler會產生一個事件傳遞下去。ApplicationProtocolNegotiationHandler會處理這個事件,得到協議而且調用configurePipelineI()方法。

public class Http2OrHttpHandler extends ApplicationProtocolNegotiationHandler {

    private static final int MAX_CONTENT_LENGTH = 1024 * 100;

    protected Http2OrHttpHandler() {
        super(ApplicationProtocolNames.HTTP_1_1);
    }

    @Override
    protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
        if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
            ctx.pipeline().addLast(new HttpToHttp2ConnectionHandler(),
			           new HelloWorldHttp1Handler("ALPN Negotiation"));
            return;
        }

        if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
            ctx.pipeline().addLast(new HttpServerCodec(),
                                   new HttpObjectAggregator(MAX_CONTENT_LENGTH),
                                   new HelloWorldHttp1Handler("ALPN Negotiation"));
            return;
        }

        throw new IllegalStateException("unknown protocol: " + protocol);
    }
}
http 1.1 與 http 2.0的協商

https的協商經過TLS的4次握手解決了,那http如何進行協商了。http使用了Upgrade機制,機制的流程上面已經說明了。你們是否發現一點。Upgrade機制是基於http1.1實現了,這是重點也是最麻煩的地方。當協商成功須要刪除http1.1與upgrade的handler,加入http2.0的hanler。

  1. http客戶端與服務端都要支持http1.1協議
  2. http客戶端與服務端都要支持Upgrade機制
  3. 進行協商
  4. http客戶端與服務端都要刪除http1.1協議與Upgrade機制的支持
  5. http客戶端與服務端都要加入http2.0協議的支持
服務端如何支持Upgrade機制
private static final UpgradeCodecFactory upgradeCodecFactory = new UpgradeCodecFactory() {
        @Override
        public UpgradeCodec newUpgradeCodec(CharSequence protocol) {
            if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
                return new Http2ServerUpgradeCodec(new HttpToHttp2ConnectionHandler());
            } else {
                return null;
            }
        }
};

HttpServerCodec sourceCodec = new HttpServerCodec();
HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(sourceCodec, upgradeCodecFactory);
cp.addLast(ourceCodec, upgradeHandler);
HttpServerUpgradeHandler Upgrade機制處理handler

HttpServerUpgradeHandler作兩件事,第一個識別Upgrade請求頭開啓server端的Upgrade機制,第二件是刪除http1.1Handler與HttpServerUpgradeHandler,加入http2.0Handler。

private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) {
    // c得到請求頭upgrade的數據
    final List<CharSequence> requestedProtocols = splitHeader(request.headers().get(HttpHeaderNames.UPGRADE));
    final int numRequestedProtocols = requestedProtocols.size();
    UpgradeCodec upgradeCodec = null;
    CharSequence upgradeProtocol = null;
    for (int i = 0; i < numRequestedProtocols; i ++) {
        final CharSequence p = requestedProtocols.get(i);//得到協議
        final UpgradeCodec c = upgradeCodecFactory.newUpgradeCodec(p);//識別是否支持改協議
        if (c != null) {
            upgradeProtocol = p;
            upgradeCodec = c;
            break;
        }
    }
    // 沒有upgradeCodec,不支持。client發過的協議
    if (upgradeCodec == null) {
        return false;
    }
	// uconnection請求頭,表示upgrade機制不完整
    CharSequence connectionHeader = request.headers().get(HttpHeaderNames.CONNECTION);
    if (connectionHeader == null) {
        return false;
    }

    Collection<CharSequence> requiredHeaders = upgradeCodec.requiredUpgradeHeaders();
    List<CharSequence> values = splitHeader(connectionHeader);
    if (!containsContentEqualsIgnoreCase(values, HttpHeaderNames.UPGRADE) ||
        !containsAllContentEqualsIgnoreCase(values, requiredHeaders)) {
        return false;
    }

    for (CharSequence requiredHeader : requiredHeaders) {
        if (!request.headers().contains(requiredHeader)) {
            return false;
        }
    }

    final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeProtocol);//建立響應,並返回協商後的協議。
    // prepareUpgradeResponse解析Http2-setting
    if (!upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse.headers())) {
        return false;
    }
	// 建立事件,誰須要處理,誰去處理。
    final UpgradeEvent event = new UpgradeEvent(upgradeProtocol, request);

    try {
        // 返回數據。這裏是一個大坑。
        final ChannelFuture writeComplete = ctx.writeAndFlush(upgradeResponse);
        // 刪除http1.1 handler
        sourceCodec.upgradeFrom(ctx);
		// 添加http2.0 handler,這裏會識別上面的大坑
        upgradeCodec.upgradeTo(ctx, request);
       // 刪除本身,即HttpServerUpgradeHandler。那麼netty的handler應該只剩下http2.0handler與業務處理handler了
        ctx.pipeline().remove(HttpServerUpgradeHandler.this);
        // 傳播 事件
        ctx.fireUserEventTriggered(event.retain());

        writeComplete.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    } finally {
        event.release();
    }
    return true;
}
Http2ServerUpgradeCode Upgrade協議解析器

Http2ServerUpgradeCode有一件很是重要的事情作。就是解析請求頭HTTP2-Settings的值最終交給Http2ConnectionEncoder處理。這個值不簡單,該值包含了http2 客戶端大量的信息。好比權重,流量信息等等。請求頭HTTP2-Settings的值使用了HPACK算法進行壓縮,而後base64編碼。使用base64解碼以後,就至關於http2.0的SETTINGS幀了。

private Http2Settings decodeSettingsHeader(ChannelHandlerContext ctx, CharSequence settingsHeader)
            throws Http2Exception {
        ByteBuf header = ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(settingsHeader), CharsetUtil.UTF_8);
        try {
            ByteBuf payload = Base64.decode(header, URL_SAFE);// base64解碼
            ByteBuf frame = createSettingsFrame(ctx, payload);
            return decodeSettings(ctx, frame);
        } finally {
            header.release();
        }
  }
// 把解碼以後的數據拼接成 http2.0的SETTINGS幀
private static ByteBuf createSettingsFrame(ChannelHandlerContext ctx, ByteBuf payload) {
        ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payload.readableBytes());
        writeFrameHeader(frame, payload.readableBytes(), SETTINGS, new Http2Flags(), 0);
        frame.writeBytes(payload);
        payload.release();
        return frame;
    }

// 下面詳解了Http2Settings裏面數據的做用。
public void remoteSettings(Http2Settings settings) throws Http2Exception {
    Boolean pushEnabled = settings.pushEnabled();
    Http2FrameWriter.Configuration config = configuration();
    Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();
    Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
    if (pushEnabled != null) {
        if (!connection.isServer() && pushEnabled) {
            throw connectionError(PROTOCOL_ERROR,
                "Client received a value of ENABLE_PUSH specified to other than 0");
        }
        connection.remote().allowPushTo(pushEnabled);
    }

    Long maxConcurrentStreams = settings.maxConcurrentStreams();
    if (maxConcurrentStreams != null) {
        connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
    }

    Long headerTableSize = settings.headerTableSize();
    if (headerTableSize != null) {
        outboundHeaderConfig.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE));
    }

    Long maxHeaderListSize = settings.maxHeaderListSize();
    if (maxHeaderListSize != null) {
        outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);
    }

    Integer maxFrameSize = settings.maxFrameSize();
    if (maxFrameSize != null) {
        outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
    }

    Integer initialWindowSize = settings.initialWindowSize();
    if (initialWindowSize != null) {
        flowController().initialWindowSize(initialWindowSize);
    }
}
客戶端如何支持而且觸發Upgrade機制

一個簡單的客戶端代碼

public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    HttpClientCodec sourceCodec = new HttpClientCodec();
    Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(Http2Handler.newHandler(false));
    HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 65536);
    pipeline.addLast(
        sourceCodec,
        upgradeHandler,
        new UpgradeRequestHandler(),
}

private final class UpgradeRequestHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        DefaultFullHttpRequest upgradeRequest =new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
        ctx.writeAndFlush(upgradeRequest);
        ctx.fireChannelActive();
        ctx.pipeline().remove(this);
    }
}

客戶端發送一個請求,請求裏面加入三個請求頭。問題1.何時發送請求。2. 誰來發送請求。3. 誰來在請求裏面加入請求頭

  1. 何時發送請求

在鏈接創建以後立馬發送。鏈接創建事件爲:channelActive。

  1. 誰來發送請求

UpgradeRequestHandler來發。UpgradeRequestHandler是使用者本身實現的,不是netty http2。緣由很簡單,Upgrade的請求是被攔截仍是不被攔截,被攔截業務handler是沒法得請求的。由使用者來決定Upgrade 請求的行爲。netty http2 默認是是被攔截的。

  1. 誰來在請求裏面加入請求頭

HttpClientUpgradeHandler 負責攔截請求並添加請求頭

private void setUpgradeRequestHeaders(ChannelHandlerContext ctx, HttpRequest request) {
    // 添加UPGRADE請求頭
    request.headers().set(HttpHeaderNames.UPGRADE, upgradeCodec.protocol());

    // 添加CONNECTIO請求頭
    Set<CharSequence> connectionParts = new LinkedHashSet<CharSequence>(2);
    connectionParts.addAll(upgradeCodec.setUpgradeHeaders(ctx, request));// 添加Http2-setting請求頭

    // Set the CONNECTION header from the set of all protocol-specific headers that were added.
    StringBuilder builder = new StringBuilder();
    for (CharSequence part : connectionParts) {
        builder.append(part);
        builder.append(',');
    }
    builder.append(HttpHeaderValues.UPGRADE);
    request.headers().add(HttpHeaderNames.CONNECTION, builder.toString());
}

HttpClientUpgradeHandler 只會對第一次的請求,進行Upgrade操做,以後所有一場啊

http2協議處理模塊

該模塊針對http2 九種幀進行的一次封裝,這樣可讓下游Handler能夠直接處理幀,而不須要本身對原始數據進行處理。該模塊是最簡單,最容易理解。請看Http2FrameCodec類就好了。Http2FrameCodec繼承Http2ConnectionHandler。

如下是每種幀對應一個封裝好的實體類。

幀類型 對應實體類
data DefaultHttp2DataFrame
headers DefaultHttp2HeadersFrame
windowUpdate DefaultHttp2WindowUpdateFrame
reset DefaultHttp2ResetFrame
pring DefaultHttp2PingFrame
settings DefaultHttp2SettingsFrame
unknown DefaultHttp2UnknownFrame
goAwary DefaultHttp2GoAwayFrame
push_promise 沒有

Http2FrameCodec 內部Http2FrameListener的實現

private final class FrameListener implements Http2FrameListener {

        @Override
        public void onUnknownFrame(
                ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
            onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload)
                    .stream(requireStream(streamId)).retain());
        }

        @Override
        public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
            onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));
        }

        @Override
        public void onPingRead(ChannelHandlerContext ctx, long data) {
            onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false));
        }

        @Override
        public void onPingAckRead(ChannelHandlerContext ctx, long data) {
            onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true));
        }

        @Override
        public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
            onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId)));
        }

        @Override
        public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
            if (streamId == 0) {
                // Ignore connection window updates.
                return;
            }
            onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId)));
        }

        @Override
        public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
                                  Http2Headers headers, int streamDependency, short weight, boolean
                                          exclusive, int padding, boolean endStream) {
            onHeadersRead(ctx, streamId, headers, padding, endStream);
        }

        @Override
        public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
                                  int padding, boolean endOfStream) {
            onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
                                        .stream(requireStream(streamId)));
        }

        @Override
        public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
                              boolean endOfStream) {
            onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding)
                                        .stream(requireStream(streamId)).retain());
            // We return the bytes in consumeBytes() once the stream channel consumed the bytes.
            return 0;
        }

        @Override
        public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) {
            onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData).retain());
        }

        @Override
        public void onPriorityRead(
                ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) {
            // TODO: Maybe handle me
        }

        @Override
        public void onSettingsAckRead(ChannelHandlerContext ctx) {
            // TODO: Maybe handle me
        }

        @Override
        public void onPushPromiseRead(
                ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding)  {
            // TODO: Maybe handle me
        }

        private Http2FrameStream requireStream(int streamId) {
            Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);
            if (stream == null) {
                throw new IllegalStateException("Stream object required for identifier: " + streamId);
            }
            return stream;
        }
    }

http2協議轉http1協議模塊

該模塊解決兩個痛處。一個是http2幀,處理很麻煩,很麻煩,無論如何就算使用【 http2協議處理模塊】使用者都須要進行不少額外的處理,http1.1就很是簡單,好用。第二是向http1.1兼容,目前有大量的http1.1服務端與客戶端。直接使用http2的確不太適合,須要修改大量的代碼。

  1. InboundHttp2ToHttpAdapter(實現轉換的類)以及InboundHttp2ToHttpAdapterBuilder
  2. HttpToHttp2ConnectionHandler(發http1.1的對象解析成http2協議發送出去)以及HttpToHttp2ConnectionHandlerBuilder

下面代碼簡單描述了http1.1的對象如何解析成http2協議而且發送出去

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {

        if (!(msg instanceof HttpMessage || msg instanceof HttpContent)) {
            ctx.write(msg, promise);
            return;
        }

        boolean release = true;
        SimpleChannelPromiseAggregator promiseAggregator =
                new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor());
        try {
            Http2ConnectionEncoder encoder = encoder();
            boolean endStream = false;
            if (msg instanceof HttpMessage) {
                final HttpMessage httpMsg = (HttpMessage) msg;

                // Provide the user the opportunity to specify the streamId
                currentStreamId = getStreamId(httpMsg.headers());

                // Convert and write the headers.
                Http2Headers http2Headers = HttpConversionUtil.toHttp2Headers(httpMsg, validateHeaders);
                endStream = msg instanceof FullHttpMessage && !((FullHttpMessage) msg).content().isReadable();
                writeHeaders(ctx, encoder, currentStreamId, httpMsg.headers(), http2Headers,
                        endStream, promiseAggregator);
            }

            if (!endStream && msg instanceof HttpContent) {
                boolean isLastContent = false;
                HttpHeaders trailers = EmptyHttpHeaders.INSTANCE;
                Http2Headers http2Trailers = EmptyHttp2Headers.INSTANCE;
                if (msg instanceof LastHttpContent) {
                    isLastContent = true;

                    // Convert any trailing headers.
                    final LastHttpContent lastContent = (LastHttpContent) msg;
                    trailers = lastContent.trailingHeaders();
                    http2Trailers = HttpConversionUtil.toHttp2Headers(trailers, validateHeaders);
                }

                // Write the data
                final ByteBuf content = ((HttpContent) msg).content();
                endStream = isLastContent && trailers.isEmpty();
                release = false;
                encoder.writeData(ctx, currentStreamId, content, 0, endStream, promiseAggregator.newPromise());

                if (!trailers.isEmpty()) {
                    // Write trailing headers.
                    writeHeaders(ctx, encoder, currentStreamId, trailers, http2Trailers, true, promiseAggregator);
                }
            }
        } catch (Throwable t) {
            onError(ctx, true, t);
            promiseAggregator.setFailure(t);
        } finally {
            if (release) {
                ReferenceCountUtil.release(msg);
            }
            promiseAggregator.doneAllocatingPromises();
        }
    }
  1. Stream轉http1.1對象。 請看Http2StreamFrameToHttpObjectCodec。感受這個類比較雞肋。

參考資料

HTTP/2 協議規範(很全面文檔) http://www.javashuo.com/article/p-drwzsvds-es.html

關於HTTP2和HTTPS,這些你必需要知道 https://cloud.tencent.com/info/b91491d04eae06df8fd4b055545f10ae.html

談談 HTTP/2 的協議協商機制 http://web.jobbole.com/85646/

HTTP2 詳解 https://www.jianshu.com/p/e57ca4fec26f

相關文章
相關標籤/搜索