知識圖譜html
只須要在netty的pipeLine中配置HttpRequestDecoder和HttpObjectAggregator。java
1:若是把解析這塊理解是一個黑盒的話,則輸入是ByteBuf,輸出是FullHttpRequest。經過該對象即可獲取到全部與http協議有關的信息。react
2:HttpRequestDecoder先經過RequestLine和Header解析成HttpRequest對象,傳入到HttpObjectAggregator。而後再經過body解析出httpContent對象,傳入到HttpObjectAggregator。當HttpObjectAggregator發現是LastHttpContent,則表明http協議解析完成,封裝FullHttpRequest。nginx
3:對於body內容的讀取涉及到Content-Length和trunked兩種方式。兩種方式只是在解析協議時處理的不一致,最終輸出是一致的。git
大概用Netty的,不管新手仍是老手,都知道它是一個「網絡通信框架」。所謂框架,基本上都是一個做用:基於底層API,提供更便捷的編程模型。那麼」通信框架」到底作了什麼事情呢?回答這個問題並不太容易,咱們不妨反過來看看,不使用Netty,直接基於NIO編寫網絡程序,你須要作什麼(以Server端TCP鏈接爲例,這裏咱們使用Reactor模型):程序員
創建線程是一個比較耗時的操做,同時維護線程自己也有一些開銷,因此咱們會須要多線程機制,幸虧JDK已經有很方便的多線程框架了,這裏咱們不須要花不少心思。github
此外,由於TCP鏈接的特性,咱們還要使用鏈接池來進行管理:算法
想一想就以爲很複雜了!實際上,基於NIO直接實現這部分東西,即便是老手也容易出現錯誤,而使用Netty以後,你只須要關注邏輯處理部分就能夠了。數據庫
這裏咱們引用Netty的example包裏的一個例子,一個簡單的EchoServer,它接受客戶端輸入,並將輸入原樣返回。其主要代碼以下:編程
public void run() { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(port)); }
這裏EchoServerHandler
是其業務邏輯的實現者,大體代碼以下:
public class EchoServerHandler extends SimpleChannelUpstreamHandler { @Override public void messageReceived( ChannelHandlerContext ctx, MessageEvent e) { // Send back the received message to the remote peer. e.getChannel().write(e.getMessage()); } }
仍是挺簡單的,不是嗎?
完成了以上一段代碼,咱們算是與Netty進行了第一次親密接觸。若是想深刻學習呢?
閱讀源碼是瞭解一個開源工具很是好的手段,可是Java世界的框架大多追求大而全,功能完備,若是逐個閱讀,不免迷失方向,Netty也並不例外。相反,抓住幾個重點對象,理解其領域概念及設計思想,從而理清其脈絡,至關於打通了任督二脈,之後的閱讀就再也不困難了。
理解Netty的關鍵點在哪呢?我以爲,除了NIO的相關知識,另外一個就是事件驅動的設計思想。什麼叫事件驅動?咱們回頭看看EchoServerHandler
的代碼,其中的參數:public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
,MessageEvent就是一個事件。這個事件攜帶了一些信息,例如這裏e.getMessage()
就是消息的內容,而EchoServerHandler
則描述了處理這種事件的方式。一旦某個事件觸發,相應的Handler則會被調用,並進行處理。這種事件機制在UI編程裏普遍應用,而Netty則將其應用到了網絡編程領域。
在Netty裏,全部事件都來自ChannelEvent
接口,這些事件涵蓋監聽端口、創建鏈接、讀寫數據等網絡通信的各個階段。而事件的處理者就是ChannelHandler
,這樣,不可是業務邏輯,連網絡通信流程中底層的處理,均可以經過實現ChannelHandler
來完成了。事實上,Netty內部的鏈接處理、協議編解碼、超時等機制,都是經過handler完成的。當博主弄明白其中的奧妙時,不得不佩服這種設計!
下圖描述了Netty進行事件處理的流程。Channel
是鏈接的通道,是ChannelEvent的產生者,而ChannelPipeline
能夠理解爲ChannelHandler的集合。
理解了Netty的事件驅動機制,咱們如今能夠來研究Netty的各個模塊了。Netty的包結構以下:
org
└── jboss
└── netty
├── bootstrap 配置並啓動服務的類
├── buffer 緩衝相關類,對NIO Buffer作了一些封裝
├── channel 核心部分,處理鏈接
├── container 鏈接其餘容器的代碼
├── example 使用示例
├── handler 基於handler的擴展部分,實現協議編解碼等附加功能
├── logging 日誌
└── util 工具類
在這裏面,channel
和handler
兩部分比較複雜。咱們不妨與Netty官方的結構圖對照一下,來了解其功能。
具體的解釋能夠看這裏:http://netty.io/3.7/guide/#architecture。圖中能夠看到,除了以前說到的事件驅動機制以外,Netty的核心功能還包括兩部分:
Zero-Copy-Capable Rich Byte Buffer
零拷貝的Buffer。爲何叫零拷貝?由於在數據傳輸時,最終處理的數據會須要對單個傳輸層的報文,進行組合或者拆分。NIO原生的ByteBuffer沒法作到這件事,而Netty經過提供Composite(組合)和Slice(切分)兩種Buffer來實現零拷貝。這部分代碼在org.jboss.netty.buffer
包中。 這裏須要額外注意,不要和操做系統級別的Zero-Copy混淆了, 操做系統中的零拷貝主要是用戶空間和內核空間之間的數據拷貝, NIO中經過DirectBuffer作了實現.
Universal Communication API
統一的通信API。這個是針對Java的Old I/O和New I/O,使用了不一樣的API而言。Netty則提供了統一的API(org.jboss.netty.channel.Channel
)來封裝這兩種I/O模型。這部分代碼在org.jboss.netty.channel
包中。
此外,Protocol Support功能經過handler機制實現。
接下來的文章,咱們會根據模塊,詳細的對Netty源碼進行分析。
參考資料:
Netty 3.7 User Guide http://netty.io/3.7/guide/
What is Netty? http://ayedo.github.io/netty/2013/06/19/what-is-netty.html
上一篇文章咱們概要介紹了Netty的原理及結構,下面幾篇文章咱們開始對Netty的各個模塊進行比較詳細的分析。Netty的結構最底層是buffer機制,這部分也相對獨立,咱們就先從buffer講起。
buffer中文名又叫緩衝區,按照維基百科的解釋,是」在數據傳輸時,在內存裏開闢的一塊臨時保存數據的區域」。它實際上是一種化同步爲異步的機制,能夠解決數據傳輸的速率不對等以及不穩定的問題。
根據這個定義,咱們能夠知道涉及I/O(特別是I/O寫)的地方,基本會有Buffer了。就Java來講,咱們很是熟悉的Old I/O–InputStream
&OutputStream
系列API,基本都是在內部使用到了buffer。Java課程老師就教過,必須調用OutputStream.flush()
,才能保證數據寫入生效!
而NIO中則直接將buffer這個概念封裝成了對象,其中最經常使用的大概是ByteBuffer了。因而使用方式變爲了:將數據寫入Buffer,flip()一下,而後將數據讀出來。因而,buffer的概念更加深刻人心了!
Netty中的buffer也不例外。不一樣的是,Netty的buffer專爲網絡通信而生,因此它又叫ChannelBuffer(好吧其實沒有什麼因果關係…)。咱們下面就來說講Netty中得buffer。固然,關於Netty,咱們必須講講它的所謂」Zero-Copy-Capable」機制。
TCP/IP協議是目前的主流網絡協議。它是一個多層協議,最下層是物理層,最上層是應用層(HTTP協議等),而作Java應用開發,通常只接觸TCP以上,即傳輸層和應用層的內容。這也是Netty的主要應用場景。
TCP報文有個比較大的特色,就是它傳輸的時候,會先把應用層的數據項拆開成字節,而後按照本身的傳輸須要,選擇合適數量的字節進行傳輸。什麼叫」本身的傳輸須要」?首先TCP包有最大長度限制,那麼太大的數據項確定是要拆開的。其次由於TCP以及下層協議會附加一些協議頭信息,若是數據項過小,那麼可能報文大部分都是沒有價值的頭信息,這樣傳輸是很不划算的。所以有了收集必定數量的小數據,並打包傳輸的Nagle算法(這個東東在HTTP協議裏會很討厭,Netty裏能夠用setOption(「tcpNoDelay」, true)關掉它)。
這麼說可能太學院派了一點,咱們舉個例子吧:
發送時,咱們這樣分3次寫入(‘ | ‘表示兩個buffer的分隔): |
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
接收時,可能變成了這樣:
+----+-------+---+---+
| AB | CDEFG | H | I |
+----+-------+---+---+
很好懂吧?但是,說了這麼多,跟buffer有個什麼關係呢?別急,咱們來看下面一部分。
咱們先回到以前的messageReceived
方法:
public void messageReceived( ChannelHandlerContext ctx, MessageEvent e) { // Send back the received message to the remote peer. transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes()); e.getChannel().write(e.getMessage()); }
這裏MessageEvent.getMessage()
默認的返回值是一個ChannelBuffer
。咱們知道,業務中須要的」Message」,實際上是一條應用層級別的完整消息,而通常的buffer工做在傳輸層,與」Message」是不能對應上的。那麼這個ChannelBuffer是什麼呢?
來一個官方給的圖,我想這個答案就很明顯了:
這裏能夠看到,TCP層HTTP報文被分紅了兩個ChannelBuffer,這兩個Buffer對咱們上層的邏輯(HTTP處理)是沒有意義的。可是兩個ChannelBuffer被組合起來,就成爲了一個有意義的HTTP報文,這個報文對應的ChannelBuffer,纔是能稱之爲」Message」的東西。這裏用到了一個詞」Virtual Buffer」,也就是所謂的」Zero-Copy-Capable Byte Buffer」了。頓時以爲豁然開朗了有沒有!
我這裏總結一下,若是說NIO的Buffer和Netty的ChannelBuffer最大的區別的話,就是前者僅僅是傳輸上的Buffer,然後者實際上是傳輸Buffer和抽象後的邏輯Buffer的結合。延伸開來講,NIO僅僅是一個網絡傳輸框架,而Netty是一個網絡應用框架,包括網絡以及應用的分層結構。
固然,在Netty裏,默認使用ChannelBuffer
表示」Message」,不失爲一個比較實用的方法,可是MessageEvent.getMessage()
是能夠存放一個POJO的,這樣子抽象程度又高了一些,這個咱們在之後講到ChannelPipeline
的時候會說到。
好了,終於來到了代碼實現部分。之因此囉嗦了這麼多,由於我以爲,關於」Zero-Copy-Capable Rich Byte Buffer」,理解爲何須要它,比理解它是怎麼實現的,可能要更重要一點。
我想可能不少朋友跟我同樣,喜歡」順藤摸瓜」式讀代碼–找到一個入口,而後順着查看它的調用,直到理解清楚。很幸運,ChannelBuffers
(注意有s!)就是這樣一根」藤」,它是全部ChannelBuffer實現類的入口,它提供了不少靜態的工具方法來建立不一樣的Buffer,靠「順藤摸瓜」式讀代碼方式,大體能把各類ChannelBuffer的實現類摸個遍。先列一下ChannelBuffer相關類圖。
此外還有WrappedChannelBuffer
系列也是繼承自AbstractChannelBuffer
,圖放到了後面。
開始覺得Netty的ChannelBuffer是對NIO ByteBuffer的一個封裝,其實不是的,它是把ByteBuffer從新實現了一遍。
以最經常使用的HeapChannelBuffer
爲例,其底層也是一個byte[],與ByteBuffer不一樣的是,它是能夠同時進行讀和寫的,而不須要使用flip()進行讀寫切換。ChannelBuffer讀寫的核心代碼在AbstactChannelBuffer
裏,這裏經過readerIndex和writerIndex兩個整數,分別指向當前讀的位置和當前寫的位置,而且,readerIndex老是小於writerIndex的。貼兩段代碼,讓你們能看的更明白一點:
public void writeByte(int value) { setByte(writerIndex ++, value); } public byte readByte() { if (readerIndex == writerIndex) { throw new IndexOutOfBoundsException("Readable byte limit exceeded: " + readerIndex); } return getByte(readerIndex ++); } public int writableBytes() { return capacity() - writerIndex; } public int readableBytes() { return writerIndex - readerIndex; }
我卻是以爲這樣的方式很是天然,比單指針與flip()要更加好理解一些。AbstactChannelBuffer還有兩個相應的mark指針markedReaderIndex
和markedWriterIndex
,跟NIO的原理是同樣的,這裏再也不贅述了。
在建立Buffer時,咱們注意到了這樣一個方法:public static ChannelBuffer buffer(ByteOrder endianness, int capacity);
,其中ByteOrder
是什麼意思呢?
這裏有個很基礎的概念:字節序(ByteOrder/Endianness)。它規定了多餘一個字節的數字(int啊long什麼的),如何在內存中表示。BIG_ENDIAN(大端序)表示高位在前,整型數12
會被存儲爲0 0 0 12
四字節,而LITTLE_ENDIAN則正好相反。可能搞C/C++的程序員對這個會比較熟悉,而Javaer則比較陌生一點,由於Java已經把內存給管理好了。可是在網絡編程方面,根據協議的不一樣,不一樣的字節序也可能會被用到。目前大部分協議仍是採用大端序,可參考RFC1700。
瞭解了這些知識,咱們也很容易就知道爲何會有BigEndianHeapChannelBuffer
和LittleEndianHeapChannelBuffer
了!
DynamicChannelBuffer是一個很方便的Buffer,之因此叫Dynamic是由於它的長度會根據內容的長度來擴充,你能夠像使用ArrayList同樣,無須關心其容量。實現自動擴容的核心在於ensureWritableBytes
方法,算法很簡單:在寫入前作容量檢查,容量不夠時,新建一個容量x2的buffer,跟ArrayList的擴容是相同的。貼一段代碼吧(爲了代碼易懂,這裏我刪掉了一些邊界檢查,只保留主邏輯):
public void writeByte(int value) { ensureWritableBytes(1); super.writeByte(value); } public void ensureWritableBytes(int minWritableBytes) { if (minWritableBytes <= writableBytes()) { return; } int newCapacity = capacity(); int minNewCapacity = writerIndex() + minWritableBytes; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } ChannelBuffer newBuffer = factory().getBuffer(order(), newCapacity); newBuffer.writeBytes(buffer, 0, writerIndex()); buffer = newBuffer; }
CompositeChannelBuffer
是由多個ChannelBuffer組合而成的,能夠看作一個總體進行讀寫。這裏有一個技巧:CompositeChannelBuffer並不會開闢新的內存並直接複製全部ChannelBuffer內容,而是直接保存了全部ChannelBuffer的引用,並在子ChannelBuffer裏進行讀寫,從而實現了」Zero-Copy-Capable」了。來段簡略版的代碼吧:
public class CompositeChannelBuffer{ //components保存全部內部ChannelBuffer private ChannelBuffer[] components; //indices記錄在整個CompositeChannelBuffer中,每一個components的起始位置 private int[] indices; //緩存上一次讀寫的componentId private int lastAccessedComponentId; public byte getByte(int index) { //經過indices中記錄的位置索引到對應第幾個子Buffer int componentId = componentId(index); return components[componentId].getByte(index - indices[componentId]); } public void setByte(int index, int value) { int componentId = componentId(index); components[componentId].setByte(index - indices[componentId], value); } }
查找componentId的算法再次不做介紹了,你們本身實現起來也不會太難。值得一提的是,基於ChannelBuffer連續讀寫的特性,使用了順序查找(而不是二分查找),而且用lastAccessedComponentId
來進行緩存。
前面說ChannelBuffer是本身的實現的,其實只說對了一半。ByteBufferBackedChannelBuffer
就是封裝了NIO ByteBuffer的類,用於實現堆外內存的Buffer(使用NIO的DirectByteBuffer
)。固然,其實它也能夠放其餘的ByteBuffer的實現類。代碼實現就不說了,也沒啥可說的。
WrappedChannelBuffer
都是幾個對已有ChannelBuffer進行包裝,完成特定功能的類。代碼不貼了,實現都比較簡單,列一下功能吧。
類名 | 入口 | 功能 |
SlicedChannelBuffer | ChannelBuffer.slice() ChannelBuffer.slice(int,int) |
某個ChannelBuffer的一部分 |
TruncatedChannelBuffer | ChannelBuffer.slice() ChannelBuffer.slice(int,int) |
某個ChannelBuffer的一部分, 能夠理解爲其實位置爲0的SlicedChannelBuffer |
DuplicatedChannelBuffer | ChannelBuffer.duplicate() | 與某個ChannelBuffer使用一樣的存儲, 區別是有本身的index |
ReadOnlyChannelBuffer | ChannelBuffers .unmodifiableBuffer(ChannelBuffer) | 只讀,你懂的 |
能夠看到,關於實現方面,Netty 3.7的buffer相關內容仍是比較簡單的,也沒有太多費腦細胞的地方。
而Netty 4.0以後就不一樣了。4.0,ChannelBuffer更名ByteBuf,成了單獨項目buffer,而且爲了性能優化,加入了BufferPool之類的機制,已經變得比較複雜了(本質倒沒怎麼變)。性能優化是個很複雜的事情,研究源碼時,建議先避開這些東西,除非你對算法情有獨鍾。舉個例子,Netty4.0裏爲了優化,將Map換成了Java 8裏6000行的ConcurrentHashMapV8,大家感覺一下…
參考資料:
Channel是理解和使用Netty的核心。Channel的涉及內容較多,這裏我使用由淺入深的介紹方法。在這篇文章中,咱們主要介紹Channel部分中Pipeline實現機制。爲了不枯燥,借用一下《盜夢空間》的「夢境」概念,但願你們喜歡。
在Netty裏,Channel
是通信的載體,而ChannelHandler
負責Channel中的邏輯處理。
那麼ChannelPipeline
是什麼呢?我以爲能夠理解爲ChannelHandler的容器:一個Channel包含一個ChannelPipeline,全部ChannelHandler都會註冊到ChannelPipeline中,並按順序組織起來。
在Netty中,ChannelEvent
是數據或者狀態的載體,例如傳輸的數據對應MessageEvent
,狀態的改變對應ChannelStateEvent
。當對Channel進行操做時,會產生一個ChannelEvent,併發送到ChannelPipeline
。ChannelPipeline會選擇一個ChannelHandler進行處理。這個ChannelHandler處理以後,可能會產生新的ChannelEvent,並流轉到下一個ChannelHandler。
例如,一個數據最開始是一個MessageEvent
,它附帶了一個未解碼的原始二進制消息ChannelBuffer
,而後某個Handler將其解碼成了一個數據對象,並生成了一個新的MessageEvent
,並傳遞給下一步進行處理。
到了這裏,能夠看到,其實Channel的核心流程位於ChannelPipeline
中。因而咱們進入ChannelPipeline的深層夢境裏,來看看它具體的實現。
Netty的ChannelPipeline包含兩條線路:Upstream和Downstream。Upstream對應上行,接收到的消息、被動的狀態改變,都屬於Upstream。Downstream則對應下行,發送的消息、主動的狀態改變,都屬於Downstream。ChannelPipeline
接口包含了兩個重要的方法:sendUpstream(ChannelEvent e)
和sendDownstream(ChannelEvent e)
,就分別對應了Upstream和Downstream。
對應的,ChannelPipeline裏包含的ChannelHandler也包含兩類:ChannelUpstreamHandler
和ChannelDownstreamHandler
。每條線路的Handler是互相獨立的。它們都很簡單的只包含一個方法:ChannelUpstreamHandler.handleUpstream
和ChannelDownstreamHandler.handleDownstream
。
Netty官方的javadoc裏有一張圖(ChannelPipeline
接口裏),很是形象的說明了這個機制(我對原圖進行了一點修改,加上了ChannelSink
,由於我以爲這部分對理解代碼流程會有些幫助):
什麼叫ChannelSink
呢?ChannelSink包含一個重要方法ChannelSink.eventSunk
,能夠接受任意ChannelEvent。」sink」的意思是」下沉」,那麼」ChannelSink」好像能夠理解爲」Channel下沉的地方」?實際上,它的做用確實是這樣,也能夠換個說法:」處於末尾的萬能Handler」。最初讀到這裏,也有些困惑,這麼理解以後,就感受簡單許多。只有Downstream包含ChannelSink
,這裏會作一些創建鏈接、綁定端口等重要操做。爲何UploadStream沒有ChannelSink呢?我只能認爲,一方面,不符合」sink」的意義,另外一方面,也沒有什麼處理好作的吧!
這裏有個值得注意的地方:在一條「流」裏,一個ChannelEvent
並不會主動的」流」經全部的Handler,而是由上一個Handler顯式的調用ChannelPipeline.sendUp(Down)stream
產生,並交給下一個Handler處理。也就是說,每一個Handler接收到一個ChannelEvent,並處理結束後,若是須要繼續處理,那麼它須要調用sendUp(Down)stream
新發起一個事件。若是它再也不發起事件,那麼處理就到此結束,即便它後面仍然有Handler沒有執行。這個機制能夠保證最大的靈活性,固然對Handler的前後順序也有了更嚴格的要求。
順便說一句,在Netty 3.x裏,這個機制會致使大量的ChannelEvent對象建立,所以Netty 4.x版本對此進行了改進。twitter的finagle框架實踐中,就提到從Netty 3.x升級到Netty 4.x,能夠大大下降GC開銷。有興趣的能夠看看這篇文章:https://blog.twitter.com/2013/netty-4-at-twitter-reduced-gc-overhead
下面咱們從代碼層面來對這裏面發生的事情進行深刻分析,這部分涉及到一些細節,須要打開項目源碼,對照來看,會比較有收穫。
ChannelPipeline
的主要的實現代碼在DefaultChannelPipeline
類裏。列一下DefaultChannelPipeline的主要字段:
public class DefaultChannelPipeline implements ChannelPipeline { private volatile Channel channel; private volatile ChannelSink sink; private volatile DefaultChannelHandlerContext head; private volatile DefaultChannelHandlerContext tail; private final Map<String, DefaultChannelHandlerContext> name2ctx = new HashMap<String, DefaultChannelHandlerContext>(4); }
這裏須要介紹一下ChannelHandlerContext
這個接口。顧名思義,ChannelHandlerContext保存了Netty與Handler相關的的上下文信息。而我們這裏的DefaultChannelHandlerContext
,則是對ChannelHandler
的一個包裝。一個DefaultChannelHandlerContext
內部,除了包含一個ChannelHandler
,還保存了」next」和」prev」兩個指針,從而造成一個雙向鏈表。
所以,在DefaultChannelPipeline
中,咱們看到的是對DefaultChannelHandlerContext
的引用,而不是對ChannelHandler
的直接引用。這裏包含」head」和」tail」兩個引用,分別指向鏈表的頭和尾。而name2ctx則是一個按名字索引DefaultChannelHandlerContext用戶的一個map,主要在按照名稱刪除或者添加ChannelHandler時使用。
前面提到了,ChannelPipeline
接口的兩個重要的方法:sendUpstream(ChannelEvent e)
和sendDownstream(ChannelEvent e)
。全部事件的發起都是基於這兩個方法進行的。Channels
類有一系列fireChannelBound
之類的fireXXXX
方法,其實都是對這兩個方法的facade包裝。
下面來看一下這兩個方法的實現。先看sendUpstream(對代碼作了一些簡化,保留主邏輯):
public void sendUpstream(ChannelEvent e) { DefaultChannelHandlerContext head = getActualUpstreamContext(this.head); head.getHandler().handleUpstream(head, e); } private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) { DefaultChannelHandlerContext realCtx = ctx; while (!realCtx.canHandleUpstream()) { realCtx = realCtx.next; if (realCtx == null) { return null; } } return realCtx; }
這裏最終調用了ChannelUpstreamHandler.handleUpstream
來處理這個ChannelEvent。有意思的是,這裏咱們看不到任何」將Handler向後移一位」的操做,可是咱們總不能每次都用同一個Handler來進行處理啊?實際上,咱們更爲經常使用的是ChannelHandlerContext.handleUpstream
方法(實現是DefaultChannelHandlerContext.sendUpstream
方法):
public void sendUpstream(ChannelEvent e) { DefaultChannelHandlerContext next = getActualUpstreamContext(this.next); DefaultChannelPipeline.this.sendUpstream(next, e); }
能夠看到,這裏最終仍然調用了ChannelPipeline.sendUpstream
方法,可是它會將Handler指針後移。
咱們接下來看看DefaultChannelHandlerContext.sendDownstream
:
public void sendDownstream(ChannelEvent e) { DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev); if (prev == null) { try { getSink().eventSunk(DefaultChannelPipeline.this, e); } catch (Throwable t) { notifyHandlerException(e, t); } } else { DefaultChannelPipeline.this.sendDownstream(prev, e); } }
與sendUpstream好像不大相同哦?這裏有兩點:一是到達末尾時,就如夢境二所說,會調用ChannelSink進行處理;二是這裏指針是往前移的,因此咱們知道了:
UpstreamHandler是從前日後執行的,DownstreamHandler是從後往前執行的。在ChannelPipeline裏添加時須要注意順序了!
DefaultChannelPipeline裏還有些機制,像添加/刪除/替換Handler,以及ChannelPipelineFactory
等,比較好理解,就不細說了。
好了,深刻分析完代碼,有點頭暈了,咱們回到最開始的地方,來想想,Netty的Pipeline機制解決了什麼問題?
我認爲至少有兩點:
一是提供了ChannelHandler的編程模型,基於ChannelHandler開發業務邏輯,基本不須要關心網絡通信方面的事情,專一於編碼/解碼/邏輯處理就能夠了。Handler也是比較方便的開發模式,在不少框架中都有用到。
二是實現了所謂的」Universal Asynchronous API」。這也是Netty官方標榜的一個功能。用過OIO和NIO的都知道,這兩套API風格相差極大,要從一個遷移到另外一個成本是很大的。即便是NIO,異步和同步編程差距也很大。而Netty屏蔽了OIO和NIO的API差別,經過Channel提供對外接口,並經過ChannelPipeline將其鏈接起來,所以替換起來很是簡單。
理清了ChannelPipeline的主流程,咱們對Channel部分的大體結構算是弄清楚了。但是到了這裏,咱們依然對一個鏈接具體怎麼處理沒有什麼概念,下篇文章,咱們會分析一下,在Netty中,捷徑如何處理鏈接的創建、數據的傳輸這些事情。
PS: Pipeline這部分拖了兩個月,終於寫完了。中間寫的實在緩慢,寫個高質量(至少是自認爲吧!)的文章不容易,可是仍不忍心這部分就此爛尾。中間參考了一些優秀的文章,還本身使用netty開發了一些應用。之後這類文章,仍是要集中時間來寫無缺了。
參考資料:
時隔好久終於又更新了!以前一直遲遲未動也是由於積累不夠,後面比較難下手。過年期間@李林鋒hw發佈了一個Netty5.0架構剖析和源碼解讀 http://vdisk.weibo.com/s/C9LV9iVqH13rW/1391437855,看完也是收穫很多。前面的文章咱們分析了Netty的結構,此次我們來分析最錯綜複雜的一部分-Netty中的多線程以及NIO的應用。
理清NIO與Netty的關係以前,咱們必須先要來看看Reactor模式。Netty是一個典型的多線程的Reactor模式的使用,理解了這部分,在宏觀上理解Netty的NIO及多線程部分就不會有什麼困難了。
本篇文章依然針對Netty 3.7,不過由於也看過一點Netty 5的源碼,因此會有一點介紹。
Reactor是一種普遍應用在服務器端開發的設計模式。Reactor中文大多譯爲「反應堆」,我當初接觸這個概念的時候,就感受很厲害,是否是它的原理就跟「核反應」差很少?後來才知道其實沒有什麼關係,從Reactor的兄弟「Proactor」(多譯爲前攝器)就能看得出來,這兩個詞的中文翻譯其實都不是太好,不夠形象。實際上,Reactor模式又有別名「Dispatcher」或者「Notifier」,我以爲這兩個都更加能代表它的本質。
那麼,Reactor模式到底是個什麼東西呢?這要從事件驅動的開發方式提及。咱們知道,對於應用服務器,一個主要規律就是,CPU的處理速度是要遠遠快於IO速度的,若是CPU爲了IO操做(例如從Socket讀取一段數據)而阻塞顯然是不划算的。好一點的方法是分爲多進程或者線程去進行處理,可是這樣會帶來一些進程切換的開銷,試想一個進程一個數據讀了500ms,期間進程切換到它3次,可是CPU卻什麼都不能幹,就這麼切換走了,是否是也不划算?
這時先驅們找到了事件驅動,或者叫回調的方式,來完成這件事情。這種方式就是,應用業務向一箇中間人註冊一個回調(event handler),當IO就緒後,就這個中間人產生一個事件,並通知此handler進行處理。這種回調的方式,也體現了「好萊塢原則」(Hollywood principle)-「Don’t call us, we’ll call you」,在咱們熟悉的IoC中也有用到。看來軟件開發真是互通的!
好了,咱們如今來看Reactor模式。在前面事件驅動的例子裏有個問題:咱們如何知道IO就緒這個事件,誰來充當這個中間人?Reactor模式的答案是:由一個不斷等待和循環的單獨進程(線程)來作這件事,它接受全部handler的註冊,並負責先操做系統查詢IO是否就緒,在就緒後就調用指定handler進行處理,這個角色的名字就叫作Reactor。
Java中的NIO能夠很好的和Reactor模式結合。關於NIO中的Reactor模式,我想沒有什麼資料能比Doug Lea大神(不知道Doug Lea?看看JDK集合包和併發包的做者吧)在《Scalable IO in Java》解釋的更簡潔和全面了。NIO中Reactor的核心是Selector
,我寫了一個簡單的Reactor示例,這裏我貼一個核心的Reactor的循環(這種循環結構又叫作EventLoop
),剩餘代碼在learning-src目錄下。
public void run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey) (it.next())); selected.clear(); } } catch (IOException ex) { /* ... */ } }
前面提到了Proactor模式,這又是什麼呢?簡單來講,Reactor模式裏,操做系統只負責通知IO就緒,具體的IO操做(例如讀寫)仍然是要在業務進程裏阻塞的去作的,而Proactor模式則更進一步,由操做系統將IO操做執行好(例如讀取,會將數據直接讀到內存buffer中),而handler只負責處理本身的邏輯,真正作到了IO與程序處理異步執行。因此咱們通常又說Reactor是同步IO,Proactor是異步IO。
關於阻塞和非阻塞、異步和非異步,以及UNIX底層的機制,你們能夠看看這篇文章IO - 同步,異步,阻塞,非阻塞 (亡羊補牢篇),以及陶輝(《深刻理解nginx》的做者)《高性能網絡編程》的系列。
講了一堆Reactor,咱們回到Netty。在《Scalable IO in Java》中講到了一種多線程下的Reactor模式。在這個模式裏,mainReactor只有一個,負責響應client的鏈接請求,並創建鏈接,它使用一個NIO Selector;subReactor能夠有一個或者多個,每一個subReactor都會在一個獨立線程中執行,而且維護一個獨立的NIO Selector。
這樣的好處很明顯,由於subReactor也會執行一些比較耗時的IO操做,例如消息的讀寫,使用多個線程去執行,則更加有利於發揮CPU的運算能力,減小IO等待時間。
好了,瞭解了多線程下的Reactor模式,咱們來看看Netty吧(如下部分主要針對NIO,OIO部分更加簡單一點,不重複介紹了)。Netty裏對應mainReactor的角色叫作「Boss」,而對應subReactor的角色叫作」Worker」。Boss負責分配請求,Worker負責執行,好像也很貼切!以TCP的Server端爲例,這兩個對應的實現類分別爲NioServerBoss
和NioWorker
(Server和Client的Worker沒有區別,由於創建鏈接以後,雙方就是對等的進行傳輸了)。
Netty 3.7中Reactor的EventLoop在AbstractNioSelector.run()
中,它實現了Runnable
接口。這個類是Netty NIO部分的核心。它的邏輯很是複雜,其中還包括一些對JDK Bug的處理(例如rebuildSelector
),剛開始讀的時候不須要深刻那麼細節。我精簡了大部分代碼,保留主幹以下:
abstract class AbstractNioSelector implements NioSelector { //NIO Selector protected volatile Selector selector; //內部任務隊列 private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>(); //selector循環 public void run() { for (;;) { try { //處理內部任務隊列 processTaskQueue(); //處理selector事件對應邏輯 process(selector); } catch (Throwable t) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } } private void processTaskQueue() { for (;;) { final Runnable task = taskQueue.poll(); if (task == null) { break; } task.run(); } } protected abstract void process(Selector selector) throws IOException; }
其中process是主要的處理事件的邏輯,例如在AbstractNioWorker
中,處理邏輯以下:
protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); i.remove(); try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { if (!read(k)) { // Connection already closed - no need to handle write. continue; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { writeFromSelectorLoop(k); } } catch (CancelledKeyException e) { close(k); } if (cleanUpCancelledKeys()) { break; // break the loop to avoid ConcurrentModificationException } } }
這不就是第二部分提到的selector經典用法了麼?
在Netty 4.0以後,做者以爲NioSelector
這個叫法,以及區分NioBoss
和NioWorker
的作法稍微繁瑣了點,乾脆就將這些合併成了NioEventLoop
,今後這兩個角色就不作區分了。我卻是以爲新版本的會更優雅一點。
下面咱們來看Netty的多線程部分。一旦對應的Boss或者Worker啓動,就會分配給它們一個線程去一直執行。對應的概念爲BossPool
和WorkerPool
。對於每一個NioServerSocketChannel
,Boss的Reactor有一個線程,而Worker的線程數由Worker線程池大小決定,可是默認最大不會超過CPU核數*2,固然,這個參數能夠經過NioServerSocketChannelFactory
構造函數的參數來設置。
public NioServerSocketChannelFactory( Executor bossExecutor, Executor workerExecutor, int workerCount) { this(bossExecutor, 1, workerExecutor, workerCount); }
最後咱們比較關心一個問題,咱們以前ChannlePipeline
中的ChannleHandler是在哪一個線程執行的呢?答案是在Worker線程裏執行的,而且會阻塞Worker的EventLoop。例如,在NioWorker
中,讀取消息完畢以後,會觸發MessageReceived
事件,這會使得Pipeline中的handler都獲得執行。
protected boolean read(SelectionKey k) { .... if (readBytes > 0) { // Fire the event. fireMessageReceived(channel, buffer); } return true; }
能夠看到,對於處理事件較長的業務,並不太適合直接放到ChannelHandler中執行。那麼怎麼處理呢?咱們在Handler部分會進行介紹。
參考資料:
題圖來自:http://www.worldindustrialreporter.com/france-gives-green-light-to-tokamak-fusion-reactor/
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
Netty是業界最流行的NIO框架之一,它的健壯性、功能、性能、可定製性和可擴展性在同類框架中都是數一數二的,它已經獲得成百上千的商用項目驗證,例如Hadoop的RPC框架avro使用Netty做爲底層通訊框架;不少其餘業界主流的RPC框架,也使用Netty來構建高性能的異步通訊能力。
API使用簡單,開發門檻低;
功能強大,預置了多種編解碼功能,支持多種主流協議;
定製能力強,能夠經過ChannelHandler對通訊框架進行靈活地擴展;
性能高,經過與其餘業界主流的NIO框架對比,Netty的綜合性能最優;
成熟、穩定,Netty修復了已經發現的全部JDK NIO BUG,業務開發人員不須要再爲NIO的BUG而煩惱;
社區活躍,版本迭代週期短,發現的BUG能夠被及時修復,同時,更多的新功能會加入;
經歷了大規模的商業應用考驗,質量獲得驗證。在互聯網、大數據、網絡遊戲、企業應用、電信軟件等衆多行業獲得成功商用,證實了它已經徹底可以知足不一樣行業的商業應用了。
NIO的類庫和API繁雜,使用麻煩,你須要熟練掌握Selector、 ServerSocketChannel、SocketChannel、ByteBuffer等。
須要具有其餘的額外技能作鋪墊,例如熟悉Java多線程編程。這是由於 NIO編程涉及到Reactor模式,你必須對多線程和網路編程很是熟悉,才能編寫出高質量的NIO程序。
可靠性能力補齊,工做量和難度都很是大。例如客戶端面臨斷連重連、網 絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常碼流的處理等問題,NIO 編程的特色是功能開發相對容易,可是可靠性能力補齊的工做量和難度都 很是大。
JDK NIO的BUG,例如臭名昭著的epoll bug,它會致使Selector空輪詢, 最終致使CPU 100%。官方聲稱在JDK1.6版本的update18修復了該問題,但 是直到JDK1.7版本該問題仍舊存在,只不過該BUG發生機率下降了一些而 已,它並無被根本解決。
Netty的線程模型能夠根據用戶設定的參數支持Reactor單線程模型、多線程模型和主從Reactor多線程模型。
咱們能夠從Netty服務端建立過程來了解Netty的線程模型:
在建立ServerBootstrap類實例前,先建立兩個EventLoopGroup,它們其實是兩個獨立的Reactor線程池,bossGroup負責接收客戶端的鏈接,workerGroup負責處理IO相關的讀寫操做,或者執行系統task、定時task等。
用於接收客戶端請求的線程池職責以下:
處理IO操做的線程池職責以下:
經過調整兩個EventLoopGroup的線程數、是否共享線程池等方式,Netty的Reactor線程模型能夠在單線程、多線程和主從多線程間切換,用戶能夠根據實際狀況靈活配置。
EventLoop/EventLoopGroup
爲了提升性能,Netty在不少地方採用了無鎖化設計。例如在IO線程的內部進行串行操做,避免多線程競爭致使的性能降低。儘管串行化設計看上去CPU利用率不高,併發程度不夠,可是經過調整NIO線程池的線程參數,能夠同時啓動多個串行化的線程並行運行,這種局部無鎖化的設計相比一個隊列——多個工做線程的模型性能更優。
Netty的Reactor模型設計以下:
Netty的NioEventLoop讀取到消息以後,調用ChannelPipeline的fireChannelRead方法,只要用戶不主動切換線程,就一直由NioEventLoop調用用戶的Handler,期間不進行線程切換。這種串行化的處理方式避免了多線程操做致使的鎖競爭,從性能角度看是最優的。
Netty多線程編程的最佳實踐以下:
ChannelHandler相似於Servlet中的filter,它負責對IO事件或者IO操做進行攔截和處理。ChannelHandler能夠選擇性地對感興趣的事件進行攔截和處理,也能夠透傳和終止事件的傳遞。基於ChannelHandler,用戶能夠方便地定製本身的業務邏輯,如日誌打印、編解碼、性能統計等。Netty自己也提供了不少有用的ChannelHandler的實現類供用戶使用。
功能說明
大多數的ChannelHandler只關心特定的一個或幾個事件,對其進行攔截和處理,而對其不關心的事件則直接交給下一個ChannelHandler。這就致使一個問題:用戶在實現ChannelHandler接口時必須實現ChannelHandler的全部方法,包括其不關心的事件處理方法,這就致使了代碼的冗餘。爲了解決這個問題,Netty提供了ChannelHandler的實現基類ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter類。這兩個類分別攔截和處理inbound事件和outbound事件,它們對全部的事件都直接透傳,用戶能夠覆蓋父類中感興趣的事件的處理方法,而對其餘事件則直接繼承父類的實現,這樣就能夠保持代碼的簡潔和清晰。
部分自帶handler:
ChannelTrafficShapingHandler/GlobalTrafficShapingHandler:Netty提供的流量整形相關的Handler,包括以Channel爲單位的流量整形和全局的流量整形。
ChannelPipeline其實是一個ChannelHandler的容器,它負責ChannelHandler的管理和事件的攔截與調度。
Netty中的ChannelPipeline和ChannelHandler相似於J2EE中的Filter過濾器機制,這類攔截器機制其實是責任鏈模式的一種變形,主要目的是方便事件的攔截和用戶業務邏輯的定製。Netty將Channel的數據管道抽象爲ChannelPipeline,消息在ChannelPipeline中間流動和傳遞。ChannelPipeline持有一個包含一系列事件攔截器ChannelHandler的鏈表,由ChannelHandler負責對事件進行攔截和處理。用戶能夠方便的增長和刪除ChannelHandler來達到定製業務邏輯的目的,而不須要對現有的ChannelHandler進行修改,實現對開放封閉原則的支持。
ChannelPipeline的主要特性
ChannelPipeline支持在運行時動態地添加或刪除ChannelHandler,好比能夠根據系統時間判斷是否處於業務高峯期,而後動態地添加或刪除擁塞控制的邏輯。還有一點就是ChannelPipeline對Handler的添加和刪除操做都是線程安全的,這意味着多個業務線程能夠併發的修改ChannelPipeline而不會出現併發問題。可是ChannelHandler不是線程安全的,用戶須要本身保證ChannelHandler的線程安全。
ByteBuf
ByteBuf相似於JDK裏的ByteBuffer,但JDK裏的ByteBuffer有幾個侷限性,如固定長度、須要手動flip等,因此Netty提供了相似ByteBuffer的實現ByteBuf。
與ByteBuffer 相似,ByteBuf提供如下幾類基本功能:
ByteBuf與ByteBuffer的不一樣之處主要有如下兩點:
因爲NIO操做中的參數都是ByteBuffer,因此ByteBuf內部包含一個ByteBuffer的引用,用來表示對應的ByteBuffer。
Bootstrap是用於啓動Netty的輔助類,提供一系列方法設置啓動參數。由於Bootstrap須要設置的各項信息不少,包括線程池、TCP選項、ChannelHandler等,因此這裏採用builder模式實現。
預置了多種編解碼功能,支持多種主流協議