歡迎你們關注個人微博 http://weibo.com/hotbain 會將發佈的開源項目技術貼經過微博通知你們,但願你們可以互勉共進!謝謝!也很但願可以獲得你們對我博文的反饋,寫出更高質量的文章!!java
Netty是對Nio的一個封裝,關於網絡的全部操做都是經過事件的方式完成的。例如鏈接建立、read事件、write事件都是經過Nio來完成的。那netty是怎麼啓動監聽的呢? 在什麼地方啓動的呢?此處不爲你們設置懸念,一次性告訴你們。經過循環掃描的方式來實現監聽的。具體的方法類位於NioEventLoop的run方法中(趕忙進去看看吧!! 淺顯易懂)。
git
下面是netty的acceptor線程建立鏈接的代碼。位於類NioEventLoop的processSelectedKey中(至於processSelectedKey是怎麼被調用的,本身看看調用鏈就好了(eclipse用ctrl+Shift+H就能夠查看到選中方法的調用鏈))。github
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { //獲得當前的key關注的事件 int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop //一個剛剛建立的NioServersocketChannel感興趣的事件是0。 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//能夠讀取操做 --對於serverSocket來講就是acceptor事件、對於socketChannel來講就是read事件 //INFO: channel類型爲io.netty.channel.socket.nio.NioSocketChannel unsafe類型爲io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe Object obj = k.attachment();//獲得NioServerSocketChannel或者NioSocketChannel if(obj instanceof NioServerSocketChannel){ System.out.println(obj.getClass().getName()+ " 開始接收鏈接"); }else{ System.out.println(obj.getClass().getName()+ " 開始接收字節"); } //不一樣的socketChannel對於那個的unsafe是不一樣的。例如Server端的是messageUnsafe 而 clinet端是byteUnsafe unsafe.read();//對於接受連接或者read興趣都會添加進入read操做調用serverSocket->NioMessageUnsafe if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) {//對於半包消息進行輸出操做 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException e) { unsafe.close(unsafe.voidPromise()); } }[object Object]
這裏咱們以鏈接的創建(NioMessageUnsafe)爲線索進行講解。後續會有基於byte的unsafe進行講解的(Unsafe不知道爲啥要這麼叫,本人也感到挺費解的,不過如今看來感受就是一個工具對象。不要從名稱上害怕它)。下面來看NioMessageUnsafe的read方法進行講解。直接講代碼(後面也會有圖形講解,方便你們理解):bootstrap
@Override public void read() { assert eventLoop().inEventLoop(); if (!config().isAutoRead()) { removeReadOp(); } final ChannelConfig config = config(); //獲得本次方法調用能夠接收的鏈接數目 final int maxMessagesPerRead = config.getMaxMessagesPerRead(); final boolean autoRead = config.isAutoRead(); final ChannelPipeline pipeline = pipeline(); boolean closed = false; Throwable exception = null; try { for (;;) { //將msg從讀取出來(SocketChannel-(common msg); serverSocketChannel(socketChannel msg)) int localRead = doReadMessages(readBuf);//readBuf僅僅是在本方法中起到緩衝統計的做用。不要多想哦!! if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //每次讀取的message的個數--- if (readBuf.size() >= maxMessagesPerRead | !autoRead) { break;//避免一次性建立過多的鏈接個數 } } } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { //對於server端來講,第一個handler爲io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor ////針對全部的channel都會執行一個read操做;對於ServerSocketChannel ServerSocketChannel對應的pipeline的fireChannelRead方法 //由於ServerSocketChannel的pipeline的第一個handler pipeline.fireChannelRead(readBuf.get(i));//1.1 代碼 } readBuf.clear();//清空到的鏈接緩存 pipeline.fireChannelReadComplete(); if (exception != null) { if (exception instanceof IOException) { // ServerChannel should not be closed even on IOException because it can often continue // accepting incoming connections. (e.g. too many open files) closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); } pipeline.fireExceptionCaught(exception); } if (closed) { if (isOpen()) { close(voidPromise()); } } } }
下面讓咱們來看看上面這段代碼中提到的promise
int localRead = doReadMessages(readBuf);//doReadMessages是個抽象方法
上代碼:緩存
@Override protected int doReadMessages(List<Object> buf) throws Exception {//對於NioServerSocketChannel,它的讀取操做,就是接受客戶端的連接和建立NioSocketChannel SocketChannel ch = javaChannel().accept();//獲得java遠程的socketChannel對象。不要認爲此處會阻塞,不會的由於connec事件發生了。因此會當即返回 try { if (ch != null) { //對javachannel().accept()返回的java原生socket進行包裝,包裝成netty的NioSocketChannel對象-將全部綁定socketChannel的處理線程 buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch)); return 1;//每次僅僅處理一個,而且將獲得鏈接對象放入到buf列表對象中進行保存 } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
在上面咱們看到將java原聲的socket包裝成了NioSocketChannel。下面咱們粘貼一下NioSocketChannel的構造方法。服務器
public NioSocketChannel(Channel parent, EventLoop eventLoop, SocketChannel socket) { super(parent, eventLoop, socket);//使用到了AbstractNioByteChannel構造方法 config = new DefaultSocketChannelConfig(this, socket.socket()); } //下面代碼是父類AbstractNioByteChannel的構造方法: protected AbstractNioByteChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch) { super(parent, eventLoop, ch, SelectionKey.OP_READ);//注意此處關注的事件爲read事件哦。 } //在往上走,看看是怎樣的一種狀況 protected AbstractChannel(Channel parent, EventLoop eventLoop) { this.parent = parent; this.eventLoop = validate(eventLoop);//注意 此處代碼意味着初始化socketChannel的時候,就已經對未來的work thread進行綁定了哦 unsafe = newUnsafe();//此處爲抽象方法,深刻到具體實現爲new NioByteUnsafe(); pipeline = new DefaultChannelPipeline(this);//由此能夠看出,在當發生message read事件的時候,就會爲每個socketChannel建立 }
到此咱們已經講解完了一個NioSocketChannel的建立過程。
無論對應一個NioServerSocketChanel仍是NioSocketChannel,對他們的pipeline進行初始化是很重要的。對於NioServerSocketChanel來講,其pipeline能夠用來對剛剛建立出來的NioSocketChannel進行初始化(後面當即會講到).對於NioSocketChannel來講,其pipeline,會對接收到bytes進行解碼轉換成業務層能夠徹底解析的對象。那麼何時將必要的NioSocketChannel的pipeline進行操做,添加必要的handler呢。這個咱們就要回到上面的代碼'1.1 代碼' 處。咱們再粘貼一下代碼:
網絡
for (int i = 0; i < size; i ++) { //對於server端來講,第一個handler爲io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor ////針對全部的channel都會執行一個read操做;對於ServerSocketChannel ServerSocketChannel對應的pipeline的fireChannelRead方法 //由於ServerSocketChannel的pipeline的第一個handler pipeline.fireChannelRead(readBuf.get(i));//1.1 代碼 }
上面的這個pipeline是NioServerSocketChanel的pipeline。其handler和 普通的爲NioSocketChannel準備的pipeline是不一樣的。NioServerSocketChanel的pipeline填充是在serverBootstrap bind的時候進行建立而且填充的(之後咱們會講解)。在這裏咱們就來看看app
pipeline.fireChannelRead(readBuf.get(i));//1.1 代碼
這一行代碼是怎麼操做的。搭配ide工具(我使用的是eclipse)。能夠追蹤到該方法調用追蹤會調用到接口pipeline的channelRead的方法上。該接口實現有不少。可是咱們此處只須要關注ServerBootstrapAcceptor(至於爲何關注它,本身能夠看一下serverBootstrap的bind方法的實現,由於對於NioServerSocketChanel的pipeline的第一個handler就是ServerBootstrapAcceptor)。上代碼:eclipse
public void channelRead(ChannelHandlerContext ctx, Object msg) { //獲得傳遞過來的channel對象 Channel child = (Channel) msg; //每個child對應的管道都應該是同樣的,將啓動時傳入的childHandler加入到客戶端SocketChannel的Pipeline中 child.pipeline().addLast(childHandler);//代碼 1.3 在此處對剛剛建立出來的socketChannel添加netty服務端的經常使用代碼的'childHandler(new ChannelInitializer<SocketChannel>()’哦 for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e);//設置客戶端SocketChannel的TCP參數 } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } //設置每個SocketChannel的屬性設置 for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } //調用註冊複用器 --注意調用的child的的unsafe方法哦。也就是調用的是Channel 的unsafe方法哦!! child.unsafe().register(child.newPromise());//註冊socektChannel到多路複用器 代碼 1.2 }
此處僅僅重點關注一下register方法。查看調用鏈會發現最終調用到以下代碼段:
public final void register(final ChannelPromise promise) { if (eventLoop.inEventLoop()) {//若是由其它線程發起,則封裝成一個Task放入消息隊列中異步執行 register0(promise); } else { try {//由於是服務器/客戶端第一次執行,因此會執行到這裏 eventLoop.execute(new Runnable() { public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); promise.setFailure(t); } } }
register0(promise); 此處重點關注,至於這麼多if else是爲啥,咱們會出一個特別的章節進行講解
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!ensureOpen(promise)) { return; } doRegister(); registered = true; promise.setSuccess(); //開始接收鏈接---用來將初始化serverSocketChannel()的pipeline pipeline.fireChannelRegistered(); if (isActive()) {//是否主動讀取,在unsafe中設置本身感興趣的事件 此處也很重要哦!! pipeline.fireChannelActive();//開始讀取操做 剛剛接收的socketChannel是自動讀取的話,那就就會用該socketchanneld對應的eventLoop裏的selector對讀事件進行監聽了 } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + "Swallowing the cause of the registration failure:", t); } } }
注意:上面代碼的pipeline是NioServerSocketchannel的pipeline哦。搭配ide工具(我使用的是eclipse)。能夠追蹤到fireChannelRegistered調用會調用到以下圖的調用中
若是你們寫過netty的服務端的代碼模版的話,會ChanneInitailer是很熟悉的(至於說爲何ChannelInitializer,請你們查看代碼1.3處)這個serverChannel的pipeline裏的handler就是用來初始化socketChannel的pipeline的(能夠用來初始化handler是哪些、Allocator等,後面會有特定章節(也會有視頻)進行講解)。粘貼一下ChannelInitializer的channelRegistered方法:
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { ChannelPipeline pipeline = ctx.pipeline(); boolean success = false; try { //ServerSocket會對應pipeline,其中只一個一個handler就是用來接收鏈接 initChannel((C) ctx.channel()); pipeline.remove(this);//從SocketChannel中刪除頻道初始化器--爲啥說是從SocketChannel刪除啊?去看看上面代碼 1.2吧!!特別是調用鏈. ctx.fireChannelRegistered();//移除後再去調用一下 success = true; } catch (Throwable t) { logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); } finally { if (pipeline.context(this) != null) {//查看是否含有指定的初始化handler,若是pipeline中仍是含有的話,那麼就移除 context()方法相似於contains方法 pipeline.remove(this); } if (!success) { ctx.close(); } } }
這裏咱們關注一下initChannel實現(netty在此處使用到了模版模式)。你們對這個很熟悉的。這裏僅僅粘貼一下你們已經很熟悉的代碼模版。
public static void main(String[] args) throws Exception { final ByteBufAllocator allocator =new PooledByteBufAllocator(true) ; //處理網絡鏈接---接受請求 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //進行socketChannel的網絡讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(2); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // .option(ChannelOption.ALLOCATOR, )//設置內存分配器 .option(ChannelOption.SO_SNDBUF, 1024)//發送緩衝器 .option(ChannelOption.SO_RCVBUF, 1024) .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)//接收緩衝器 .handler(new LoggingHandler(LogLevel.INFO))//serverSocketChannel對應的ChannelPipeline的handler .childHandler(new ChannelInitializer<SocketChannel>() {//客戶端新接入的鏈接socketChannel對應的ChannelPipeline的Handler @Override public void initChannel(SocketChannel ch) { SocketChannelConfig config=ch.config(); //設置緩存的分配器--每個socket都對應一個Allocator // config.setAllocator(AllocatorContext.getInstance()); //一個Allocator有多個threadCache,可是一個thread只能與一個threadCache進行綁定,綁定成功後,就不能再次改變了 //是否會出現一個Allocator下的treadCache個數不能被全部的thread 平均分配 //ThreadCache --用來針對某個線程下的內存分配,若是全部的線程對象共用一個 config.setAllocator(allocator); ChannelPipeline p = ch.pipeline(); p // .addLast(new LineBasedFrameDecoder(30))//也會將回車符刪除掉--是以換行符做爲分隔的 //若是在讀取了maxLength個字符以後仍是沒有讀取到結束分隔符的話就會跑出異常(防止異常碼流確實分隔符致使的內存溢出,這是netty解碼器的可好性保護) .addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.copiedBuffer("$".getBytes()))) // .addLast(new FixedLengthFrameDecoder(10)) // .addLast(new LineBasedFrameDecoder(2000)) // .addLast(new DiscardServerHandler()); } }); // Bind and start to accept incoming connections. ChannelFuture f = b.bind(PORT).sync(); System.out.println("ChannelFuture f = b.bind(PORT).sync();"); // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }
講到這裏咱們都講完了鏈接的創建過程主要就是pipeline的初始化還有就是事件的監聽狀況。
爲了你們更好的理解,特地繪製鏈接創建的流程圖,會有少許代碼的哦!!
本文是本人學習Netty後的我的總結的分享,須要說明的是本人僅僅表明我的的見解,不是什麼官方權威的版本,若是有不當之處,還請賜教!!歡迎吐槽!!