Netty Channel分類與新鏈接接入

Netty新鏈接接入流程

1.檢測新連接->2.建立NioSocketChannel->3.分配線程及註冊selector->4.向selector註冊讀事件java

檢測新鏈接

步驟

  • processSelectedKey(key,channel)入口
    • NioMessageUnsafe.read()
      • doReadMessages() while循環
        • javaChannel().accept()

分析

  • processSelectedKey(key,channel)入口
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            int readyOps = k.readyOps();
            // 省略代碼...
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                // 調用與ServerSocketChannel綁定的
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
複製代碼

doReadMessages()算法

@Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        // 接收客戶端鏈接
        SocketChannel ch = SocketUtils.accept(javaChannel());
        try {
            if (ch != null) {
                // netty建立本身的客戶端channel
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            //省略代碼...
        }

        return 0;
    }
複製代碼

建立NioSocketChannel

步驟

  • new NioSocketChannel(parent,ch) 入口
    • AbstractNioByteChannel(p,ch,op_read)
      • configureBlocking(false)&save op
      • create id,unsafe,pipeline
    • new NioSocketChannelConfig()
      • setTcpNoDelay(true)

分析

  • new NioSocketChannel(parent,ch) 入口
public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        // 建立channel對應的config
        config = new NioSocketChannelConfig(this, socket.socket());
    }
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        // 指定channel關注讀事件
        super(parent, ch, SelectionKey.OP_READ);
    }
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            //省略代碼..
        }
    }
複製代碼
  • 其餘過程服務端channel建立相似,咱們說下這點setTcpNoDelay(true)
public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
        try {
            // 若是不是安卓就true
            javaSocket.setTcpNoDelay(tcpNoDelay);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }
    當開啓nagle算法時,客戶端首先發送大小爲1字節的第一個分組,隨後其它分組到達發送緩衝區,因爲上一個分組的應答尚未收到,因此TCP會先緩存新來的這4個小分組,並將其從新分組,組成一個大小爲8(2+3+1+2)字節的」較大的」小分組。當第一個小分組的應答收到後,客戶端將這個8字節的分組發送。總共發送的報文段(分組)個數爲2。當傳輸數據存在大量交互數據時,nagle算法能夠有效減小網絡中的報文段個數
    /**
     * Disable Nagle's algorithm for this connection. Written data * to the network is not buffered pending acknowledgement of * previously written data. */ @Native public final static int TCP_NODELAY = 0x0001; 複製代碼

Netty中的Channel的分類

步驟

  • NioServerSocketChannel
  • NioSocketChannel

分析

  • 一個簡化的類圖

以前建立Channel過程當中,客戶端和服務端channel有公共成員變量。它們的類繼承關係如圖: promise

新鏈接NioEventLoop分配和selector註冊

步驟

服務端Channel的pipeline構成 Head --> ServerBootstrapAcceptor --> tail緩存

  • ServerBootstrapAcceptor
    • 添加childHandler
    • 設置options和attrs
    • Chooser選擇NioEventLopp並註冊selector

分析

  • 在服務端channel建立的時候會在pipeline中添加個ServerBootstrapAcceptor。
p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
複製代碼
  • 建立ServerBootstrapAcceptor 過程最終要的一點就是初始化childGroup,由於一會要從中選擇EventLoop的其中一個,並將channel註冊上去
  • 上面的doReadMessages對於鏈接事件,其實讀到的是NioSocketChannel
int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        readPending = false;
        // 關鍵點在這,這句話的意思是  serverSocketChannel.pipeline().fireChannelRead(nioSocketChannel)
        pipeline.fireChannelRead(readBuf.get(i));
    }

    // serverAcceptor的channelRead方法 
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 省略代碼 ... 
        // child 也就是咱們上面說的 nioSocketChannel
        final Channel child = (Channel) msg;
        // 那麼這句話的意思就是 把nioSocketChannel註冊到EventLoop的其中一個的selector上
        childGroup.register(child).addListener(...);
    }
複製代碼
  • NioSocketChannel註冊過程
public ChannelFuture register(Channel channel) {
        // next()其實就是咱們以前在EventLoop中提到的EventLoop選擇的問題,這塊是個輪詢詳情請看EventLoop那篇文章
        return next().register(channel);
    }
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // 這塊實際上是調用與客戶端Channel所對應的unsafe
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    // 一直跟到最後的register.
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 這塊尚未監聽具體的事件
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...
            }
        }
    }
複製代碼
  • 設置selector的interestOps
// readInterestOp上邊設置的是OP_READ,也就是監聽讀事件
    protected void doBeginRead() throws Exception {
        ...
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
複製代碼
相關文章
相關標籤/搜索