Netty5鏈接建立過程_源碼講解

歡迎你們關注個人微博 http://weibo.com/hotbain 會將發佈的開源項目技術貼經過微博通知你們,但願你們可以互勉共進!謝謝!也很但願可以獲得你們對我博文的反饋,寫出更高質量的文章!!java

Netty是對Nio的一個封裝,關於網絡的全部操做都是經過事件的方式完成的。例如鏈接建立、read事件、write事件都是經過Nio來完成的。那netty是怎麼啓動監聽的呢? 在什麼地方啓動的呢?此處不爲你們設置懸念,一次性告訴你們。經過循環掃描的方式來實現監聽的。具體的方法類位於NioEventLoop的run方法中(趕忙進去看看吧!! 淺顯易懂)。
git

下面是netty的acceptor線程建立鏈接的代碼。位於類NioEventLoop的processSelectedKey中(至於processSelectedKey是怎麼被調用的,本身看看調用鏈就好了(eclipse用ctrl+Shift+H就能夠查看到選中方法的調用鏈))。github

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            //獲得當前的key關注的事件
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            //一個剛剛建立的NioServersocketChannel感興趣的事件是0。
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//能夠讀取操做  --對於serverSocket來講就是acceptor事件、對於socketChannel來講就是read事件 
                //INFO: channel類型爲io.netty.channel.socket.nio.NioSocketChannel unsafe類型爲io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe
                Object obj = k.attachment();//獲得NioServerSocketChannel或者NioSocketChannel
                if(obj instanceof NioServerSocketChannel){
                    System.out.println(obj.getClass().getName()+ " 開始接收鏈接");
                }else{
                    System.out.println(obj.getClass().getName()+ " 開始接收字節");
                }
                //不一樣的socketChannel對於那個的unsafe是不一樣的。例如Server端的是messageUnsafe 而 clinet端是byteUnsafe
                unsafe.read();//對於接受連接或者read興趣都會添加進入read操做調用serverSocket->NioMessageUnsafe
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {//對於半包消息進行輸出操做
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }[object Object]

這裏咱們以鏈接的創建(NioMessageUnsafe)爲線索進行講解。後續會有基於byte的unsafe進行講解的(Unsafe不知道爲啥要這麼叫,本人也感到挺費解的,不過如今看來感受就是一個工具對象。不要從名稱上害怕它)。下面來看NioMessageUnsafe的read方法進行講解。直接講代碼(後面也會有圖形講解,方便你們理解):bootstrap

 @Override
        public void read() {
            assert eventLoop().inEventLoop();
            if (!config().isAutoRead()) {
                removeReadOp();
            }

            final ChannelConfig config = config();
            //獲得本次方法調用能夠接收的鏈接數目
            final int maxMessagesPerRead = config.getMaxMessagesPerRead();
            final boolean autoRead = config.isAutoRead();
            final ChannelPipeline pipeline = pipeline();
            boolean closed = false;
            Throwable exception = null;
            try {
                for (;;) { 
                    //將msg從讀取出來(SocketChannel-(common msg); serverSocketChannel(socketChannel msg))
                    int localRead = doReadMessages(readBuf);//readBuf僅僅是在本方法中起到緩衝統計的做用。不要多想哦!!
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }
                    //每次讀取的message的個數---
                    if (readBuf.size() >= maxMessagesPerRead | !autoRead) {
                        break;//避免一次性建立過多的鏈接個數
                    }
                }
            } catch (Throwable t) {
                exception = t;
            }

            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                //對於server端來講,第一個handler爲io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor
                ////針對全部的channel都會執行一個read操做;對於ServerSocketChannel ServerSocketChannel對應的pipeline的fireChannelRead方法
                //由於ServerSocketChannel的pipeline的第一個handler
                pipeline.fireChannelRead(readBuf.get(i));//1.1 代碼
            }
            readBuf.clear();//清空到的鏈接緩存
            pipeline.fireChannelReadComplete();

            if (exception != null) {
                if (exception instanceof IOException) {
                    // ServerChannel should not be closed even on IOException because it can often continue
                    // accepting incoming connections. (e.g. too many open files)
                    closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
                }

                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        }
    }

下面讓咱們來看看上面這段代碼中提到的promise

 int localRead = doReadMessages(readBuf);//doReadMessages是個抽象方法

上代碼:緩存

 @Override
    protected int doReadMessages(List<Object> buf) 
throws Exception 
{//對於NioServerSocketChannel,它的讀取操做,就是接受客戶端的連接和建立NioSocketChannel
        SocketChannel ch = javaChannel().accept();//獲得java遠程的socketChannel對象。不要認爲此處會阻塞,不會的由於connec事件發生了。因此會當即返回
        try {
            if (ch != null) {
                //對javachannel().accept()返回的java原生socket進行包裝,包裝成netty的NioSocketChannel對象-將全部綁定socketChannel的處理線程
                buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                return 1;//每次僅僅處理一個,而且將獲得鏈接對象放入到buf列表對象中進行保存
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);
            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
        return 0;
    }

在上面咱們看到將java原聲的socket包裝成了NioSocketChannel。下面咱們粘貼一下NioSocketChannel的構造方法。服務器

 public NioSocketChannel(Channel parent, EventLoop eventLoop, SocketChannel socket) {
        super(parent, eventLoop, socket);//使用到了AbstractNioByteChannel構造方法
        config = new DefaultSocketChannelConfig(this, socket.socket());
    }
//下面代碼是父類AbstractNioByteChannel的構造方法:
  protected AbstractNioByteChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch) {
        super(parent, eventLoop, ch, SelectionKey.OP_READ);//注意此處關注的事件爲read事件哦。
    }
//在往上走,看看是怎樣的一種狀況
protected AbstractChannel(Channel parent, EventLoop eventLoop) {
        this.parent = parent;
        this.eventLoop = validate(eventLoop);//注意  此處代碼意味着初始化socketChannel的時候,就已經對未來的work thread進行綁定了哦
        unsafe = newUnsafe();//此處爲抽象方法,深刻到具體實現爲new NioByteUnsafe();
        pipeline = new DefaultChannelPipeline(this);//由此能夠看出,在當發生message read事件的時候,就會爲每個socketChannel建立
    }

到此咱們已經講解完了一個NioSocketChannel的建立過程。
無論對應一個NioServerSocketChanel仍是NioSocketChannel,對他們的pipeline進行初始化是很重要的。對於NioServerSocketChanel來講,其pipeline能夠用來對剛剛建立出來的NioSocketChannel進行初始化(後面當即會講到).對於NioSocketChannel來講,其pipeline,會對接收到bytes進行解碼轉換成業務層能夠徹底解析的對象。那麼何時將必要的NioSocketChannel的pipeline進行操做,添加必要的handler呢。這個咱們就要回到上面的代碼'1.1 代碼' 處。咱們再粘貼一下代碼:
網絡

  for (int i = 0; i < size; i ++) {
                //對於server端來講,第一個handler爲io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor
                ////針對全部的channel都會執行一個read操做;對於ServerSocketChannel ServerSocketChannel對應的pipeline的fireChannelRead方法
                //由於ServerSocketChannel的pipeline的第一個handler
                pipeline.fireChannelRead(readBuf.get(i));//1.1 代碼  
            }

上面的這個pipeline是NioServerSocketChanel的pipeline。其handler和 普通的爲NioSocketChannel準備的pipeline是不一樣的。NioServerSocketChanel的pipeline填充是在serverBootstrap bind的時候進行建立而且填充的(之後咱們會講解)。在這裏咱們就來看看app

pipeline.fireChannelRead(readBuf.get(i));//1.1 代碼

這一行代碼是怎麼操做的。搭配ide工具(我使用的是eclipse)。能夠追蹤到該方法調用追蹤會調用到接口pipeline的channelRead的方法上。該接口實現有不少。可是咱們此處只須要關注ServerBootstrapAcceptor(至於爲何關注它,本身能夠看一下serverBootstrap的bind方法的實現,由於對於NioServerSocketChanel的pipeline的第一個handler就是ServerBootstrapAcceptor)。上代碼:eclipse

public void channelRead(ChannelHandlerContext ctx, Object msg) {
            //獲得傳遞過來的channel對象
            Channel child = (Channel) msg;
            //每個child對應的管道都應該是同樣的,將啓動時傳入的childHandler加入到客戶端SocketChannel的Pipeline中
            child.pipeline().addLast(childHandler);//代碼 1.3 在此處對剛剛建立出來的socketChannel添加netty服務端的經常使用代碼的'childHandler(new ChannelInitializer<SocketChannel>()’哦

            for (Entry<ChannelOption<?>, Object> e: childOptions) {
                try {
                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);//設置客戶端SocketChannel的TCP參數
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + child, t);
                }
            }

            //設置每個SocketChannel的屬性設置
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            //調用註冊複用器  --注意調用的child的的unsafe方法哦。也就是調用的是Channel 的unsafe方法哦!!
            child.unsafe().register(child.newPromise());//註冊socektChannel到多路複用器  代碼 1.2
        }

此處僅僅重點關注一下register方法。查看調用鏈會發現最終調用到以下代碼段:

 public final void register(final ChannelPromise promise) {
            if (eventLoop.inEventLoop()) {//若是由其它線程發起,則封裝成一個Task放入消息隊列中異步執行
                register0(promise);
            } else {
                try {//由於是服務器/客戶端第一次執行,因此會執行到這裏
                    eventLoop.execute(new Runnable() {
                        
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    promise.setFailure(t);
                }
            }
        }
 register0(promise); 此處重點關注,至於這麼多if else是爲啥,咱們會出一個特別的章節進行講解
private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!ensureOpen(promise)) {
                    return;
                }
                doRegister();
                registered = true;
                promise.setSuccess();
                //開始接收鏈接---用來將初始化serverSocketChannel()的pipeline
                pipeline.fireChannelRegistered();
                if (isActive()) {//是否主動讀取,在unsafe中設置本身感興趣的事件  此處也很重要哦!!
                    pipeline.fireChannelActive();//開始讀取操做  剛剛接收的socketChannel是自動讀取的話,那就就會用該socketchanneld對應的eventLoop裏的selector對讀事件進行監聽了
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                if (!promise.tryFailure(t)) {
                    logger.warn(
                            "Tried to fail the registration promise, but it is complete already. " +
                                    "Swallowing the cause of the registration failure:", t);
                }
            }
        }

 注意:上面代碼的pipeline是NioServerSocketchannel的pipeline哦。搭配ide工具(我使用的是eclipse)。能夠追蹤到fireChannelRegistered調用會調用到以下圖的調用中

若是你們寫過netty的服務端的代碼模版的話,會ChanneInitailer是很熟悉的(至於說爲何ChannelInitializer,請你們查看代碼1.3處)這個serverChannel的pipeline裏的handler就是用來初始化socketChannel的pipeline的(能夠用來初始化handler是哪些、Allocator等,後面會有特定章節(也會有視頻)進行講解)。粘貼一下ChannelInitializer的channelRegistered方法:

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ChannelPipeline pipeline = ctx.pipeline();
        boolean success = false;
        try {
            //ServerSocket會對應pipeline,其中只一個一個handler就是用來接收鏈接
            initChannel((C) ctx.channel());
            pipeline.remove(this);//從SocketChannel中刪除頻道初始化器--爲啥說是從SocketChannel刪除啊?去看看上面代碼 1.2吧!!特別是調用鏈.
            ctx.fireChannelRegistered();//移除後再去調用一下
            success = true;
        } catch (Throwable t) {
            logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
        } finally {
            if (pipeline.context(this) != null) {//查看是否含有指定的初始化handler,若是pipeline中仍是含有的話,那麼就移除 context()方法相似於contains方法
                pipeline.remove(this);
            }
            if (!success) {
                ctx.close();
            }
        }
    }

這裏咱們關注一下initChannel實現(netty在此處使用到了模版模式)。你們對這個很熟悉的。這裏僅僅粘貼一下你們已經很熟悉的代碼模版。

 public static void main(String[] args) throws Exception {
        final ByteBufAllocator allocator =new PooledByteBufAllocator(true) ;
        //處理網絡鏈接---接受請求
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //進行socketChannel的網絡讀寫
        EventLoopGroup workerGroup = new NioEventLoopGroup(2);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
//             .option(ChannelOption.ALLOCATOR, )//設置內存分配器
              .option(ChannelOption.SO_SNDBUF, 1024)//發送緩衝器
              .option(ChannelOption.SO_RCVBUF, 1024)
            .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)//接收緩衝器
             .handler(new LoggingHandler(LogLevel.INFO))//serverSocketChannel對應的ChannelPipeline的handler
             .childHandler(new ChannelInitializer<SocketChannel>() {//客戶端新接入的鏈接socketChannel對應的ChannelPipeline的Handler
                 @Override
                 public void initChannel(SocketChannel ch) {
                     SocketChannelConfig config=ch.config();
                     //設置緩存的分配器--每個socket都對應一個Allocator
//                     config.setAllocator(AllocatorContext.getInstance());
                     //一個Allocator有多個threadCache,可是一個thread只能與一個threadCache進行綁定,綁定成功後,就不能再次改變了
                     //是否會出現一個Allocator下的treadCache個數不能被全部的thread 平均分配
                     //ThreadCache --用來針對某個線程下的內存分配,若是全部的線程對象共用一個
                     config.setAllocator(allocator);
                     
                     ChannelPipeline p = ch.pipeline();
                     p
//                     .addLast(new LineBasedFrameDecoder(30))//也會將回車符刪除掉--是以換行符做爲分隔的
                     //若是在讀取了maxLength個字符以後仍是沒有讀取到結束分隔符的話就會跑出異常(防止異常碼流確實分隔符致使的內存溢出,這是netty解碼器的可好性保護)
                     .addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.copiedBuffer("$".getBytes())))
                     
//                     .addLast(new FixedLengthFrameDecoder(10))
                     
//                     .addLast(new LineBasedFrameDecoder(2000))
//                     
                     .addLast(new DiscardServerHandler());
                 }
             });

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(PORT).sync();
            System.out.println("ChannelFuture f = b.bind(PORT).sync();");
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

講到這裏咱們都講完了鏈接的創建過程主要就是pipeline的初始化還有就是事件的監聽狀況。

爲了你們更好的理解,特地繪製鏈接創建的流程圖,會有少許代碼的哦!!

本文是本人學習Netty後的我的總結的分享,須要說明的是本人僅僅表明我的的見解,不是什麼官方權威的版本,若是有不當之處,還請賜教!!歡迎吐槽!!

相關文章
相關標籤/搜索