歡迎你們關注個人微博 http://weibo.com/hotbain 會將發佈的開源項目技術貼經過微博通知你們,但願你們可以互勉共進!謝謝!也很但願可以獲得你們對我博文的反饋,寫出更高質量的文章!!java
Netty是對Nio的一個封裝,關於網絡的全部操做都是經過事件的方式完成的。例如鏈接建立、read事件、write事件都是經過Nio來完成 的。那netty是怎麼啓動監聽的呢? 在什麼地方啓動的呢?此處不爲你們設置懸念,一次性告訴你們。經過循環掃描的方式來實現監聽的。具體的方法類位於NioEventLoop的run方法中 (趕忙進去看看吧!! 淺顯易懂)。
git
下面是netty的acceptor線程建立鏈接的代碼。位於類NioEventLoop的processSelectedKey中(至於 processSelectedKey是怎麼被調用的,本身看看調用鏈就好了(eclipse用ctrl+Shift+H就能夠查看到選中方法的調用 鏈))。github
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { //獲得當前的key關注的事件 int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop //一個剛剛建立的NioServersocketChannel感興趣的事件是0。 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//能夠讀取操做 --對於serverSocket來講就是acceptor事件、對於socketChannel來講就是read事件 //INFO: channel類型爲io.netty.channel.socket.nio.NioSocketChannel unsafe類型爲io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe Object obj = k.attachment();//獲得NioServerSocketChannel或者NioSocketChannel if(obj instanceof NioServerSocketChannel){ System.out.println(obj.getClass().getName()+ " 開始接收鏈接"); }else{ System.out.println(obj.getClass().getName()+ " 開始接收字節"); } //不一樣的socketChannel對於那個的unsafe是不一樣的。例如Server端的是messageUnsafe 而 clinet端是byteUnsafe unsafe.read();//對於接受連接或者read興趣都會添加進入read操做調用serverSocket->NioMessageUnsafe if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) {//對於半包消息進行輸出操做 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException e) { unsafe.close(unsafe.voidPromise()); } }
這裏咱們以Read事件的處理(NioByteUnsafe)爲線索進行講解。後續會有基於byte的unsafe進行講解的(Unsafe不知道爲啥要這 麼叫,本人也感到挺費解的,不過如今看來感受就是一個工具對象。不要從名稱上害怕它)。下面來看NioByteUnsafe(該類是AbstractNioByteChannel的一個內部類)的read方法進行講 解。直接講代碼(後面也會有圖形講解,方便你們理解):算法
public void read() { //獲得config對象、pipeline對象 final ChannelConfig config = config(); //獲得對應的管道對象 final ChannelPipeline pipeline = pipeline(); //實際的內存分配器--- final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { //建立一個allocHandle對象--AdaptiveRecvByteBufAllocator //RecvByteBufAllocator負責內存分配的算法問題 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } if (!config.isAutoRead()) { removeReadOp(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int byteBufCapacity = allocHandle.guess(); int totalReadAmount = 0; do { //多是 direct或者 heap 從與當前socket相關的allocator獲得byteBuf數組 // byteBuf =allocHandle.allocate(allocator); //每次從內核中讀取數據netty都會分配內存 byteBuf = allocator.ioBuffer(byteBufCapacity); //得到能夠寫入的容量的大小 int writable = byteBuf.writableBytes(); //分一個多大的內存就從socket中讀取多大的數據 int localReadAmount = doReadBytes(byteBuf);//從socket中讀取數據到bytebuf中 if (localReadAmount <= 0) {//發生了讀取事件,可是讀取的長度是負數, // not was read release the buffer byteBuf.release();//釋放到Thread Cache中 close = localReadAmount < 0;//是否進行關閉,關鍵要看讀取到的數據的長度是否爲-1; break; } //發起讀取事件---若是是第一次積累數據的話,那麼就會將當前的bytebuf做爲累積對象,供繼續使用 pipeline.fireChannelRead(byteBuf); byteBuf = null;//由pipeline進行byteBuf的釋放 //避免內存溢出, if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead);//每次讀取的消息的數量都會有限制的,也就說,每次處理read事件的消息量是能夠配置的 //讀取完成---處理完一次 讀取事件 pipeline.fireChannelReadComplete(); //對本次讀取的數據量進行記錄,便於下一次爲當前的Channel分配合適大小的buffer allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } } }
//上述代碼段說明: /** config.getRecvByteBufAllocator().newHandle(); 負責內存分配算法 而 ByteBufAllocator 負責具體的內存分配-分配到堆仍是直接內存 */
這就是對一個read的處理基本流程,就是將從socket中讀取到的放入到分配器分配的bytebuf,而後將其傳入到pipeline.fireChannelRead(byteBuf);中,至於在pipeline是怎樣的傳遞的,咱們從這個方法中是沒法查看到的。這也是咱們這篇文章的主要內容(別的內存也很重要哦!關鍵是我已經添加了不少註釋了!)。就是要看看在獲得bytebuf後,pipeline是怎麼處理傳入進去的bytebuf的。咱們來對pipeline.fireChannelRead(byteBuf);窮追(ctrl+shift+H eclipse)到具體的實現,數組
咱們發現,最終會調用到的ChannelHandler接口的網絡
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
ChannelHandler有不少咱們具體選用哪個呢?動動腦子就知道,咱們pipeline中存儲的都是ChannelHandler,有哪些個Handler,就要看咱們在啓動代碼中是怎樣設置了。來看看個人啓動代碼(精簡版,沒有寫全,因此這裏看不懂得話,建議你寫個Netty的小demo).上代碼:less
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // .option(ChannelOption.ALLOCATOR, )//設置內存分配器 .option(ChannelOption.SO_SNDBUF, 1024)//發送緩衝器 .option(ChannelOption.SO_RCVBUF, 1024) .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)//接收緩衝器 .handler(new LoggingHandler(LogLevel.INFO))//serverSocketChannel對應的ChannelPipeline的handler .childHandler(new ChannelInitializer<SocketChannel>() {//客戶端新接入的鏈接socketChannel對應的ChannelPipeline的Handler @Override public void initChannel(SocketChannel ch) { SocketChannelConfig config=ch.config(); ChannelPipeline p = ch.pipeline(); p .addLast(new LineBasedFrameDecoder(30))//也會將回車符刪除掉--是以換行符做爲分隔的 .addLast(new DiscardServerHandler()); } });
由此能夠看到,這裏第一個被調用的ChannelHandler是LineBasedFrameDecoder。看看LineBasedFrameDecoder是怎麼實現ChannelRead方法的。翻看了弗雷以後,咱們終於找到了channelRead方法。由此能夠看到,在AbstractNioByteChannel的read方法中的pipeline.fireChannelRead(byteBuf);按照個人啓動代碼(雖說是按照個人,可是按照大家的也是這樣,由於byte在經過網絡接收以後,都要進行decode,第一個通過的channelHandler確定是ByteToMessageDecoder,不信,你看看本身的啓動代碼試試),最終調用的是ByteToMessageDecoder.channelRead() ,上代碼:eclipse
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { //緩衝區的大小沒有超過須要寫入的數據的大小 if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { expandCumulation(ctx, data.readableBytes());//擴展緩衝區--查看實現後,就是經過分配一個更大的,而後複製一下字節數據 } cumulation.writeBytes(data);//將數據寫入到積累對象中 data.release();//釋放bytebuffer(heap或者direct)--經過引用的方式進行釋放緩衝區(至於什麼是引用方式釋放,咱們會有一個特定的章節進行講解) } //收集完畢以後解析收集到的字符串---一般調用子類的方法實現,在具體實現中,用out來承載解析出來的msg callDecode(ctx, cumulation, out);//實現的時候,不要釋放咱們的累積對象cumulation } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) {//若是累積對象爲null或者沒有可讀內容的話,那麼就將累積對象釋放掉(由於空了或者爲null了) cumulation.release(); cumulation = null; } int size = out.size();//代碼 11 decodeWasNull = size == 0; //針對解析後的out結果中的msg的對象,將解析出來的message(具體的類型,請本身看實現.是怎樣作的)傳遞到pipeline中。 for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle();//代碼 22 } } else { ctx.fireChannelRead(msg); } }
提示: 一個pipeline,爲某個socketChannel全部,也就是說pipeline裏的channelHandler,也是爲某個socketchannel所享用的。不會出現多個線程共享一個channelHanler的狀況(咱們可讓他們共享一個handler,可是咱們得保證這個共享的handler是一個無狀態的handler,例如咱們如今就要講解的ByteToMessageDecoder就是一個有狀態的handler,因此就不能共享,就要在每次初始化socketChannel的pipeline時,都要從新new一個ByteToMessageDecoder,不信你們,能夠能夠看一下ByteToMessageDecoder的實現。我直接粘貼代碼吧!!(看看個人註釋哦)以下:).socket
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { ByteBuf cumulation;//由於單詞cumulation --累積 意思,也就是,這個成員對象,就是用來做爲半包的累積存儲的對象來使用的 private boolean singleDecode; private boolean decodeWasNull; private boolean first; }
下面咱們看一下callDecode()是怎樣完成的,上代碼,ide
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) {//傳入的字節是否有可讀數據 int outSize = out.size(); int oldInputLength = in.readableBytes(); decode(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) {//若是此handler被移除 break; } if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } }
上面的代碼,很容易易理解,就是進行必要的校驗,其中最惹人眼的就是decode()方法,而decode方法該類中是抽象方法:
/** * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input * {@link ByteBuf} has nothing to read anymore, till nothing was read from the input {@link ByteBuf} or till * this method returns {@code null}. * * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @param in the {@link ByteBuf} from which to read data * @param out the {@link List} to which decoded messages should be added * @throws Exception is thrown if an error accour */ protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
咱們來看一下具體實現有哪些?見下圖
咱們發現ByteToMessageDecoder的的decode子類實現有好多,咱們爲了講解的方便咱們選擇使用,FixedLengthFrameDecoder做爲研究對象。至於別的decoder你們有時間本身去看一下吧1!!(很簡單的,不要懼怕).
上代碼(FixedLengthFrameDecoder.decode方法):
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded);//out是外界傳入的一個用來盛放解析出來的message對象的容器 } }
此處調用了自有的decode方法,上代碼:
這裏咱們看到若是能夠讀取的數據長度沒有要求的長度搞的話,那麼就會以傳入的ByteBuf參數(其實這裏就是那個累積對象)爲基礎,構建一個新的ByteBuf。
下面咱們這裏特地大概翻譯下in.readBytes(frameLength)方法的註釋是怎樣的狀況.
ByteBuf io.netty.buffer.ByteBuf.readBytes(int length)
Transfers this buffer's data to a newly created buffer starting at the current
readerIndex
and increases thereaderIndex
by the number of the transferred bytes (=length
). The returned buffer'sreaderIndex
andwriterIndex
are0
andlength
respectively.
Returns:
the newly created buffer which contains the transferred bytes 該方法返回的是新建立的盛有傳輸數據的直接緩衝對象
將當前的ByteBuf對象的數據傳輸到一個剛剛建立的ByteBuf,就是從readerindex開始,而後增長ReaderIndex的值,增長length個字節數。返回的字節的的readerindex和writerindex分別是0和length。
經過閱讀上面的註釋的閱讀,咱們能夠看到,就FixedLengthFrameDecoder解析器來講,其實累積對象對readerIndex進行了改變。也就是說,累積對象能夠讀取的數據的數據量是發生變化的(咱們能夠在源代碼中看一下在decode先後,readerindex是否發生了變化,觀察一下就知道了。這個很簡單哦,看一下我是怎麼知道這一點的,見下圖)。至於除了FixedLengthFrameDecoder以外的別的decoder是否也改變了readerindex,你們能夠去具體查看一下代碼(不過我我的以爲確定都是這麼作滴!!)。
----------------------------------------------------------------------------------------------------------
累積對象的內存釋放問題講解完了(其實很簡單,就是把readerindex改變了一下,具體長度就看解析出來的message的長度了,哈哈)。
講到這裏,會涉及到一個解析出來的message在被pipeline中的其它handler處理完畢後的內存釋放問題。怎麼解決? 何時釋放這些message佔用的空間呢?
咱們從上面代碼11 和代碼22之間的代碼能夠看出,就是在將子類解析出來的msg,傳入到後續的( 由於當前的decoder Handler負責將大的ByteBuf累積對象轉換成小的後續handler能夠理解的msg對象,數據這個msg對象是個什麼類型,就要看子類是怎麼將什麼類型的msg放入到out盛放容器的了 )handler中。由此能夠看出: 對於一個socketChannel,其message的處理順序不會出現錯亂。永遠都是先處理完前一個,而後纔是後一個,由於這是在一個線程裏依次處理全部的msg的。
message是在何時釋放呢? 仍是看 代碼11 和代碼22之間的代碼 你們能夠本身去看看代碼。我發現就是被丟了,被JVM回收了,沒有重複利用。我我的以爲能夠重複利用。關於這個問題,你們回去本身理解一下吧!!有時間得的話,我也會專門將一下的。畢竟這篇文章是講Netty read事件處理的。不是將netty內存分配的。放心我不會忘記這個問題的。我會在後續的文章中講解的。歡迎你們吐槽!!!
本文是本人學習Netty後的我的總結的分享,須要說明的是本人僅僅表明我的的見解,不是什麼官方權威的版本,若是有不當之處,還請賜教!!歡迎吐槽!!