近期接到一個任務,把netty http2引入到項目裏面。據說過http2,還真沒有本身玩過。對看過這篇博客的你們說句: 抱歉。原本想很好的說說http2的。寫着寫着,發現要寫的東西太多了,有一些內容根本就很差寫。可是netty http2.0的主要內容,本章博客已經全面的講述了,須要讀者有使用經歷,閱讀點源碼。html
時代在發展,使用http協議的人愈來愈多。http1.1的弊端慢慢都被顯現出來。java
http客服端不知道http服務端是否支持http2.0。反過來 http服務端也不知道http客服端是否支持http2.0。爲何出現這種現象,讓全部的http服務端與http客服端直接從http1.1過分到http2.0是不可能的事情。甚至在大點的公司內部直接從http1.1直接過分到http2.0也是一件不現實的事情,那麼出現一件麻煩的事情有http1客服端,也有http2客服端。有http2服務端,也有http1服務端。這種兩個維度,四種狀況的共存現象。web
有人會問,只支持http1.1很差嗎? 已經支持http2,.0的client確定不會放棄http2.0優秀的性能與特性,能使用使用http2.0,就要使用。算法
那麼http2.0的設計者爲了解決這種麻煩的東西。推出瞭解決方案:協商。promise
https1.1與https2.0的協商是基於ALPN機制。ALPNS是基於TLS實現。在創建TLS連接的時候,客服端會 在TLS協議裏面加入本身支持的協議,服務端在客服端支持的協議裏面選中一個本身最合適的。而後把選中的協議通知給客服端。若是客戶端沒有發送支持的http協議,服務端會默認使用http1.1瀏覽器
http沒有TLS協議,沒法基於TLS傳遞協議。協議制定者使用了Upgrade機制。客戶端發送一個空請求,請求裏面包含該Upgrade,Connection,HTTP2-Settings請求頭。服務端從Upgrade取出支持的協議而後響應請求,在響應的請求頭裏麪包含Upgrade,Connection。這樣協商就成功了。下面是http1.1與http2.0的協商流程服務器
請求頭示例:架構
GET / HTTP/1.1 Host: example.com Connection: Upgrade, HTTP2-Settings Upgrade: h2c HTTP2-Settings: <base64url encoding of HTTP/2 SETTINGS payload>
若是服務端不支持 HTTP/2,它會忽略 Upgrade 字段,直接返回 HTTP/1.1 響應,例如:併發
HTTP/1.1 200 OK Content-Length: 243 Content-Type: text/html
若是服務端支持 HTTP/2,那就能夠迴應 101 狀態碼及對應頭部:app
HTTP/1.1 100 Switching Protocols Connection: Upgrade Upgrade: h2c
netty 的http2模塊設計的很是好,實在是很繞,就一個http2的包。實在有點亂。按照功能劃分的話http2應該有四個模塊。
做爲核心的模塊主要是負責http2協議的解碼與編碼,幀的解析與處理,http2請求頭子模塊,streamId的管理,http2連接管理
Http2ConnectionHandler 是 netty核心設計hadler的實現,也是http2模塊的出發點。
protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) { this.initialSettings = checkNotNull(initialSettings, "initialSettings"); this.decoder = checkNotNull(decoder, "decoder"); this.encoder = checkNotNull(encoder, "encoder"); if (encoder.connection() != decoder.connection()) { throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object"); } }
@Override public void flush(ChannelHandlerContext ctx) { try { // Trigger pending writes in the remote flow controller. encoder.flowController().writePendingBytes(); ctx.flush(); } catch (Http2Exception e) { onError(ctx, true, e); } catch (Throwable cause) { onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing")); } } public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // Initialize the encoder, decoder, flow controllers, and internal state. encoder.lifecycleManager(this); decoder.lifecycleManager(this); encoder.flowController().channelHandlerContext(ctx); decoder.flowController().channelHandlerContext(ctx); byteDecoder = new PrefaceDecoder(ctx); } @Override protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { if (byteDecoder != null) { byteDecoder.handlerRemoved(ctx); byteDecoder = null; } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (byteDecoder == null) { byteDecoder = new PrefaceDecoder(ctx);//當連接建立的時候建立PrefaceDecoder對象 } byteDecoder.channelActive(ctx); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // Call super class first, as this may result in decode being called. super.channelInactive(ctx); if (byteDecoder != null) { byteDecoder.channelInactive(ctx); byteDecoder = null; } } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { // Writability is expected to change while we are writing. We cannot allow this event to trigger reentering // the allocation and write loop. Reentering the event loop will lead to over or illegal allocation. try { if (ctx.channel().isWritable()) { flush(ctx); } encoder.flowController().channelWritabilityChanged(); } finally { super.channelWritabilityChanged(ctx); } } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { byteDecoder.decode(ctx, in, out); }
public interface Http2LifecycleManager { void closeStreamLocal(Http2Stream stream, ChannelFuture future); void closeStreamRemote(Http2Stream stream, ChannelFuture future); void closeStream(Http2Stream stream, ChannelFuture future); ChannelFuture resetStream(ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise); ChannelFuture goAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, ChannelPromise promise);
closeStreamLocal
關閉本地stream。local stream是指客服端發送headers幀與data幀到服務端,客服端會建立一個local stream。一樣服務端發送headers幀與data幀給客服端,服務端也會建立一個 local stream
closeStreamRemote
關閉遠程stream。 remote stream是值當客服端接受服務端發的headers幀與data幀 ,客服端會建立一個remote stream。一樣服務端接受到客服端發送的headers幀與data幀,服務端也會建立一個 remote stream
closeStream
當接受到 resetStream 幀的時候就用調用改方法。發送方發送一個錯誤的流,想後悔的時候,就發送resetStream幀這個後悔藥
resetStream
對resetStream幀進行處理
public void onHttpClientUpgrade() throws Http2Exception { if (connection().isServer()) { throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server"); } if (!prefaceSent()) { // If the preface was not sent yet it most likely means the handler was not added to the pipeline before // calling this method. throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent"); } if (decoder.prefaceReceived()) { throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received"); } // Create a local stream used for the HTTP cleartext upgrade. connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true); } /** * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2. * @param settings the settings for the remote endpoint. */ public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception { if (!connection().isServer()) { throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client"); } if (!prefaceSent()) { // If the preface was not sent yet it most likely means the handler was not added to the pipeline before // calling this method. throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent"); } if (decoder.prefaceReceived()) { throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received"); } // Apply the settings but no ACK is necessary. encoder.remoteSettings(settings); // Create a stream in the half-closed state. connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true); }
鏈接前言????? 很術語化。其實就是協商成功以後,客戶端能夠開始發送各類 HTTP/2 幀,但第一個幀必須是 Magic 幀(內容固定爲 PRI * HTTP/2.0rnrnSMrnrn),作爲協議升級的最終確認。鏈接前言模塊由父類BaseDecoder與子類PrefaceDecoder,FrameDecoder組成。鏈接前言由PrefaceDecoder管理。
客服端發送前言
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // The channel just became active - send the connection preface to the remote endpoint. sendPreface(ctx); } private void sendPreface(ChannelHandlerContext ctx) throws Exception { if (prefaceSent || !ctx.channel().isActive()) { return; } prefaceSent = true; final boolean isClient = !connection().isServer(); if (isClient) { // Clients must send the preface string as the first bytes on the connection. ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } // Both client and server must send their initial settings. encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener( ChannelFutureListener.CLOSE_ON_FAILURE); if (isClient) { // If this handler is extended by the user and we directly fire the userEvent from this context then // the user will not see the event. We should fire the event starting with this handler so this class // (and extending classes) have a chance to process the event. userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE); } }
服務端校驗鏈接前言
@Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { try { if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) { // After the preface is read, it is time to hand over control to the post initialized decoder. byteDecoder = new FrameDecoder(); byteDecoder.decode(ctx, in, out); } } catch (Throwable e) { onError(ctx, false, e); } } private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception { if (clientPrefaceString == null) { return true; } int prefaceRemaining = clientPrefaceString.readableBytes(); int bytesRead = min(in.readableBytes(), prefaceRemaining); // If the input so far doesn't match the preface, break the connection. if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(), clientPrefaceString, clientPrefaceString.readerIndex(), bytesRead)) { int maxSearch = 1024; // picked because 512 is too little, and 2048 too much int http1Index = ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch))); if (http1Index != -1) { String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII); throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk); } String receivedBytes = hexDump(in, in.readerIndex(), min(in.readableBytes(), clientPrefaceString.readableBytes())); throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " + "Hex dump for received bytes: %s", receivedBytes); } in.skipBytes(bytesRead); clientPrefaceString.skipBytes(bytesRead); if (!clientPrefaceString.isReadable()) { // Entire preface has been read. clientPrefaceString.release(); clientPrefaceString = null; return true; } return false; } private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception { if (in.readableBytes() < 5) { // Need more data before we can see the frame type for the first frame. return false; } short frameType = in.getUnsignedByte(in.readerIndex() + 3); short flags = in.getUnsignedByte(in.readerIndex() + 4); if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) { throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " + "Hex dump for first 5 bytes: %s", hexDump(in, in.readerIndex(), 5)); } return true; }
麻煩的設計,只是爲解耦而已。代碼層次明顯。這就是架構......
解碼實現類爲DefaultHttp2ConnectionEncoder,看下很容易理解的。DefaultHttp2ConnectionEncoder內部有兩個成員變量lifecycleManager(http2生命週期)與Http2FrameWriter(協議生成實現類)。
goAway與resetStream幀的處理方式,是由lifecycleManager(http2生命週期)
@Override public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, ChannelPromise promise) { return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise); } @Override public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) { // Delegate to the lifecycle manager for proper updating of connection state. return lifecycleManager.resetStream(ctx, streamId, errorCode, promise); }
重點:FlowControlledHeaders與FlowControlledData主要是用於流量控制與權重使用,不屬於編碼範疇
解碼流程爲 Handler(Http2ConnectionHandle) --> Decoder(PrefaceDecoder) --> Decoder(FrameDecoder) --> decoder(Http2ConnectionDecoder) --> frameReader(Http2FrameReader) --> FrameListener(Http2FrameListener) 。按照此調用鏈慢慢看容易看懂了。
frameReader 負責解析協議。 Http2FrameReader 實現類爲DefaultHttp2FrameReader
@Override public void readFrame(ChannelHandlerContext ctx, ByteBuf input, Http2FrameListener listener) throws Http2Exception { if (readError) { input.skipBytes(input.readableBytes()); return; } try { do { if (readingHeaders) { processHeaderState(input);// 解析幀 if (readingHeaders) { // Wait until the entire header has arrived. return; } } processPayloadState(ctx, input, listener);// 處理幀 if (!readingHeaders) { // Wait until the entire payload has arrived. return; } } while (input.isReadable()); } catch (Http2Exception e) { readError = !Http2Exception.isStreamError(e); throw e; } catch (RuntimeException e) { readError = true; throw e; } catch (Throwable cause) { readError = true; PlatformDependent.throwException(cause); } } private void processHeaderState(ByteBuf in) throws Http2Exception { if (in.readableBytes() < FRAME_HEADER_LENGTH) { // Wait until the entire frame header has been read. return; } // Read the header and prepare the unmarshaller to read the frame. payloadLength = in.readUnsignedMedium();// 幀長度 if (payloadLength > maxFrameSize) { throw connectionError(FRAME_SIZE_ERROR, "Frame length: %d exceeds maximum: %d", payloadLength, maxFrameSize); } frameType = in.readByte();// 幀類型 flags = new Http2Flags(in.readUnsignedByte()); streamId = readUnsignedInt(in);// 獲得streamId 幀頭的9個字節處理完畢 // We have consumed the data, next time we read we will be expecting to read the frame payload. readingHeaders = false; switch (frameType) {//進行校驗 case DATA: verifyDataFrame(); break; case HEADERS: verifyHeadersFrame(); break; case PRIORITY: verifyPriorityFrame(); break; case RST_STREAM: verifyRstStreamFrame(); break; case SETTINGS: verifySettingsFrame(); break; case PUSH_PROMISE: verifyPushPromiseFrame(); break; case PING: verifyPingFrame(); break; case GO_AWAY: verifyGoAwayFrame(); break; case WINDOW_UPDATE: verifyWindowUpdateFrame(); break; case CONTINUATION: verifyContinuationFrame(); break; default: // Unknown frame type, could be an extension. verifyUnknownFrame(); break; } } private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener) throws Http2Exception { if (in.readableBytes() < payloadLength) { // Wait until the entire payload has been read. return; } // Get a view of the buffer for the size of the payload. ByteBuf payload = in.readSlice(payloadLength); // We have consumed the data, next time we read we will be expecting to read a frame header. readingHeaders = true; // Read the payload and fire the frame event to the listener. switch (frameType) { case DATA: readDataFrame(ctx, payload, listener); break; case HEADERS: readHeadersFrame(ctx, payload, listener); break; case PRIORITY: readPriorityFrame(ctx, payload, listener); break; case RST_STREAM: readRstStreamFrame(ctx, payload, listener); break; case SETTINGS: readSettingsFrame(ctx, payload, listener); break; case PUSH_PROMISE: readPushPromiseFrame(ctx, payload, listener); break; case PING: readPingFrame(ctx, payload.readLong(), listener); break; case GO_AWAY: readGoAwayFrame(ctx, payload, listener); break; case WINDOW_UPDATE: readWindowUpdateFrame(ctx, payload, listener); break; case CONTINUATION: readContinuationFrame(payload, listener); break; default: readUnknownFrame(ctx, payload, listener); break; } }
在http2的設計中請求頭是一個重點。http2要求請求使用HPACK算法進行壓縮。Http2Headers,Http2HeadersDecoder,Http2HeadersEncoder,HPACKDecoder,HPACKEncoder組成。
http2.0只須要一個Http2ConnectionHandler的實現類。目前netty自帶的實現類有Http2FrameCodec,HttpToHttp2ConnectionHandler。
http1.1須要HttpServerCodec與HttpObjectAggregator對象。
https 須要一個SSLHandler
private void configureSsl(SocketChannel ch) { ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()), new Http2OrHttpHandler()); }
客戶端在TSL 4次握手的時候已經把客戶端支持的http協議傳給服務端了,當4次握手成功,SSLHandler會產生一個事件傳遞下去。ApplicationProtocolNegotiationHandler會處理這個事件,得到協議而且調用configurePipelineI()方法。
public class Http2OrHttpHandler extends ApplicationProtocolNegotiationHandler { private static final int MAX_CONTENT_LENGTH = 1024 * 100; protected Http2OrHttpHandler() { super(ApplicationProtocolNames.HTTP_1_1); } @Override protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception { if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { ctx.pipeline().addLast(new HttpToHttp2ConnectionHandler(), new HelloWorldHttp1Handler("ALPN Negotiation")); return; } if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { ctx.pipeline().addLast(new HttpServerCodec(), new HttpObjectAggregator(MAX_CONTENT_LENGTH), new HelloWorldHttp1Handler("ALPN Negotiation")); return; } throw new IllegalStateException("unknown protocol: " + protocol); } }
https的協商經過TLS的4次握手解決了,那http如何進行協商了。http使用了Upgrade機制,機制的流程上面已經說明了。你們是否發現一點。Upgrade機制是基於http1.1實現了,這是重點也是最麻煩的地方。當協商成功須要刪除http1.1與upgrade的handler,加入http2.0的hanler。
private static final UpgradeCodecFactory upgradeCodecFactory = new UpgradeCodecFactory() { @Override public UpgradeCodec newUpgradeCodec(CharSequence protocol) { if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { return new Http2ServerUpgradeCodec(new HttpToHttp2ConnectionHandler()); } else { return null; } } }; HttpServerCodec sourceCodec = new HttpServerCodec(); HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(sourceCodec, upgradeCodecFactory); cp.addLast(ourceCodec, upgradeHandler);
HttpServerUpgradeHandler作兩件事,第一個識別Upgrade請求頭開啓server端的Upgrade機制,第二件是刪除http1.1Handler與HttpServerUpgradeHandler,加入http2.0Handler。
private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) { // c得到請求頭upgrade的數據 final List<CharSequence> requestedProtocols = splitHeader(request.headers().get(HttpHeaderNames.UPGRADE)); final int numRequestedProtocols = requestedProtocols.size(); UpgradeCodec upgradeCodec = null; CharSequence upgradeProtocol = null; for (int i = 0; i < numRequestedProtocols; i ++) { final CharSequence p = requestedProtocols.get(i);//得到協議 final UpgradeCodec c = upgradeCodecFactory.newUpgradeCodec(p);//識別是否支持改協議 if (c != null) { upgradeProtocol = p; upgradeCodec = c; break; } } // 沒有upgradeCodec,不支持。client發過的協議 if (upgradeCodec == null) { return false; } // uconnection請求頭,表示upgrade機制不完整 CharSequence connectionHeader = request.headers().get(HttpHeaderNames.CONNECTION); if (connectionHeader == null) { return false; } Collection<CharSequence> requiredHeaders = upgradeCodec.requiredUpgradeHeaders(); List<CharSequence> values = splitHeader(connectionHeader); if (!containsContentEqualsIgnoreCase(values, HttpHeaderNames.UPGRADE) || !containsAllContentEqualsIgnoreCase(values, requiredHeaders)) { return false; } for (CharSequence requiredHeader : requiredHeaders) { if (!request.headers().contains(requiredHeader)) { return false; } } final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeProtocol);//建立響應,並返回協商後的協議。 // prepareUpgradeResponse解析Http2-setting if (!upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse.headers())) { return false; } // 建立事件,誰須要處理,誰去處理。 final UpgradeEvent event = new UpgradeEvent(upgradeProtocol, request); try { // 返回數據。這裏是一個大坑。 final ChannelFuture writeComplete = ctx.writeAndFlush(upgradeResponse); // 刪除http1.1 handler sourceCodec.upgradeFrom(ctx); // 添加http2.0 handler,這裏會識別上面的大坑 upgradeCodec.upgradeTo(ctx, request); // 刪除本身,即HttpServerUpgradeHandler。那麼netty的handler應該只剩下http2.0handler與業務處理handler了 ctx.pipeline().remove(HttpServerUpgradeHandler.this); // 傳播 事件 ctx.fireUserEventTriggered(event.retain()); writeComplete.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } finally { event.release(); } return true; }
Http2ServerUpgradeCode有一件很是重要的事情作。就是解析請求頭HTTP2-Settings的值最終交給Http2ConnectionEncoder處理。這個值不簡單,該值包含了http2 客戶端大量的信息。好比權重,流量信息等等。請求頭HTTP2-Settings的值使用了HPACK算法進行壓縮,而後base64編碼。使用base64解碼以後,就至關於http2.0的SETTINGS幀了。
private Http2Settings decodeSettingsHeader(ChannelHandlerContext ctx, CharSequence settingsHeader) throws Http2Exception { ByteBuf header = ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(settingsHeader), CharsetUtil.UTF_8); try { ByteBuf payload = Base64.decode(header, URL_SAFE);// base64解碼 ByteBuf frame = createSettingsFrame(ctx, payload); return decodeSettings(ctx, frame); } finally { header.release(); } } // 把解碼以後的數據拼接成 http2.0的SETTINGS幀 private static ByteBuf createSettingsFrame(ChannelHandlerContext ctx, ByteBuf payload) { ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payload.readableBytes()); writeFrameHeader(frame, payload.readableBytes(), SETTINGS, new Http2Flags(), 0); frame.writeBytes(payload); payload.release(); return frame; } // 下面詳解了Http2Settings裏面數據的做用。 public void remoteSettings(Http2Settings settings) throws Http2Exception { Boolean pushEnabled = settings.pushEnabled(); Http2FrameWriter.Configuration config = configuration(); Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration(); Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy(); if (pushEnabled != null) { if (!connection.isServer() && pushEnabled) { throw connectionError(PROTOCOL_ERROR, "Client received a value of ENABLE_PUSH specified to other than 0"); } connection.remote().allowPushTo(pushEnabled); } Long maxConcurrentStreams = settings.maxConcurrentStreams(); if (maxConcurrentStreams != null) { connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE)); } Long headerTableSize = settings.headerTableSize(); if (headerTableSize != null) { outboundHeaderConfig.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE)); } Long maxHeaderListSize = settings.maxHeaderListSize(); if (maxHeaderListSize != null) { outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize); } Integer maxFrameSize = settings.maxFrameSize(); if (maxFrameSize != null) { outboundFrameSizePolicy.maxFrameSize(maxFrameSize); } Integer initialWindowSize = settings.initialWindowSize(); if (initialWindowSize != null) { flowController().initialWindowSize(initialWindowSize); } }
一個簡單的客戶端代碼
public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); HttpClientCodec sourceCodec = new HttpClientCodec(); Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(Http2Handler.newHandler(false)); HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 65536); pipeline.addLast( sourceCodec, upgradeHandler, new UpgradeRequestHandler(), } private final class UpgradeRequestHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { DefaultFullHttpRequest upgradeRequest =new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); ctx.writeAndFlush(upgradeRequest); ctx.fireChannelActive(); ctx.pipeline().remove(this); } }
客戶端發送一個請求,請求裏面加入三個請求頭。問題1.何時發送請求。2. 誰來發送請求。3. 誰來在請求裏面加入請求頭
在鏈接創建以後立馬發送。鏈接創建事件爲:channelActive。
UpgradeRequestHandler來發。UpgradeRequestHandler是使用者本身實現的,不是netty http2。緣由很簡單,Upgrade的請求是被攔截仍是不被攔截,被攔截業務handler是沒法得請求的。由使用者來決定Upgrade 請求的行爲。netty http2 默認是是被攔截的。
HttpClientUpgradeHandler 負責攔截請求並添加請求頭
private void setUpgradeRequestHeaders(ChannelHandlerContext ctx, HttpRequest request) { // 添加UPGRADE請求頭 request.headers().set(HttpHeaderNames.UPGRADE, upgradeCodec.protocol()); // 添加CONNECTIO請求頭 Set<CharSequence> connectionParts = new LinkedHashSet<CharSequence>(2); connectionParts.addAll(upgradeCodec.setUpgradeHeaders(ctx, request));// 添加Http2-setting請求頭 // Set the CONNECTION header from the set of all protocol-specific headers that were added. StringBuilder builder = new StringBuilder(); for (CharSequence part : connectionParts) { builder.append(part); builder.append(','); } builder.append(HttpHeaderValues.UPGRADE); request.headers().add(HttpHeaderNames.CONNECTION, builder.toString()); }
HttpClientUpgradeHandler 只會對第一次的請求,進行Upgrade操做,以後所有一場啊
該模塊針對http2 九種幀進行的一次封裝,這樣可讓下游Handler能夠直接處理幀,而不須要本身對原始數據進行處理。該模塊是最簡單,最容易理解。請看Http2FrameCodec類就好了。Http2FrameCodec繼承Http2ConnectionHandler。
如下是每種幀對應一個封裝好的實體類。
幀類型 | 對應實體類 |
---|---|
data | DefaultHttp2DataFrame |
headers | DefaultHttp2HeadersFrame |
windowUpdate | DefaultHttp2WindowUpdateFrame |
reset | DefaultHttp2ResetFrame |
pring | DefaultHttp2PingFrame |
settings | DefaultHttp2SettingsFrame |
unknown | DefaultHttp2UnknownFrame |
goAwary | DefaultHttp2GoAwayFrame |
push_promise | 沒有 |
Http2FrameCodec 內部Http2FrameListener的實現
private final class FrameListener implements Http2FrameListener { @Override public void onUnknownFrame( ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) { onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload) .stream(requireStream(streamId)).retain()); } @Override public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings)); } @Override public void onPingRead(ChannelHandlerContext ctx, long data) { onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false)); } @Override public void onPingAckRead(ChannelHandlerContext ctx, long data) { onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true)); } @Override public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) { onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId))); } @Override public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) { if (streamId == 0) { // Ignore connection window updates. return; } onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId))); } @Override public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) { onHeadersRead(ctx, streamId, headers, padding, endStream); } @Override public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) { onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding) .stream(requireStream(streamId))); } @Override public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) { onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding) .stream(requireStream(streamId)).retain()); // We return the bytes in consumeBytes() once the stream channel consumed the bytes. return 0; } @Override public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) { onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData).retain()); } @Override public void onPriorityRead( ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) { // TODO: Maybe handle me } @Override public void onSettingsAckRead(ChannelHandlerContext ctx) { // TODO: Maybe handle me } @Override public void onPushPromiseRead( ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) { // TODO: Maybe handle me } private Http2FrameStream requireStream(int streamId) { Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey); if (stream == null) { throw new IllegalStateException("Stream object required for identifier: " + streamId); } return stream; } }
該模塊解決兩個痛處。一個是http2幀,處理很麻煩,很麻煩,無論如何就算使用【 http2協議處理模塊】使用者都須要進行不少額外的處理,http1.1就很是簡單,好用。第二是向http1.1兼容,目前有大量的http1.1服務端與客戶端。直接使用http2的確不太適合,須要修改大量的代碼。
下面代碼簡單描述了http1.1的對象如何解析成http2協議而且發送出去
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (!(msg instanceof HttpMessage || msg instanceof HttpContent)) { ctx.write(msg, promise); return; } boolean release = true; SimpleChannelPromiseAggregator promiseAggregator = new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor()); try { Http2ConnectionEncoder encoder = encoder(); boolean endStream = false; if (msg instanceof HttpMessage) { final HttpMessage httpMsg = (HttpMessage) msg; // Provide the user the opportunity to specify the streamId currentStreamId = getStreamId(httpMsg.headers()); // Convert and write the headers. Http2Headers http2Headers = HttpConversionUtil.toHttp2Headers(httpMsg, validateHeaders); endStream = msg instanceof FullHttpMessage && !((FullHttpMessage) msg).content().isReadable(); writeHeaders(ctx, encoder, currentStreamId, httpMsg.headers(), http2Headers, endStream, promiseAggregator); } if (!endStream && msg instanceof HttpContent) { boolean isLastContent = false; HttpHeaders trailers = EmptyHttpHeaders.INSTANCE; Http2Headers http2Trailers = EmptyHttp2Headers.INSTANCE; if (msg instanceof LastHttpContent) { isLastContent = true; // Convert any trailing headers. final LastHttpContent lastContent = (LastHttpContent) msg; trailers = lastContent.trailingHeaders(); http2Trailers = HttpConversionUtil.toHttp2Headers(trailers, validateHeaders); } // Write the data final ByteBuf content = ((HttpContent) msg).content(); endStream = isLastContent && trailers.isEmpty(); release = false; encoder.writeData(ctx, currentStreamId, content, 0, endStream, promiseAggregator.newPromise()); if (!trailers.isEmpty()) { // Write trailing headers. writeHeaders(ctx, encoder, currentStreamId, trailers, http2Trailers, true, promiseAggregator); } } } catch (Throwable t) { onError(ctx, true, t); promiseAggregator.setFailure(t); } finally { if (release) { ReferenceCountUtil.release(msg); } promiseAggregator.doneAllocatingPromises(); } }
HTTP/2 協議規範(很全面文檔) http://www.javashuo.com/article/p-drwzsvds-es.html
關於HTTP2和HTTPS,這些你必需要知道 https://cloud.tencent.com/info/b91491d04eae06df8fd4b055545f10ae.html
談談 HTTP/2 的協議協商機制 http://web.jobbole.com/85646/