nio的好夥伴——netty

NIO的那些事

咱們在前段時間學習了IONIO的一些概念性的東西,而且寫了一些簡單的例子進行實踐,雖然簡單,但基本上覆蓋了NIO的一些最基本的概念了。
若是還沒看過的,若是翻一下以前的文章瞭解一下,或者看一下網上的其餘文章。html

JAVANIO的那些痛

既然咱們學過NIO,那咱們以JAVANIO來舉個例子,說明一下咱們使用NIO的一些基本流程:java

  1. 打開ServerSocketChannel(Server端)或SocketChannel(Client端),監聽對應的端口或鏈接對應的端口
  2. 設置configureBlocking(false)爲非阻塞
  3. 經過register註冊要監聽的描述符
  4. 經過Selector.open打開Selector
  5. 調用Selector.select獲得已就緒的SelectionKey
  6. 遍歷SelectionKey進行相應的處理

這裏我把以前的某些步驟合併了,可能跟以前有前面的文章有點不一致,但整體步驟是同樣的。程序員

其實,上面的步驟咱們大能夠了解到,咱們真正須要關注的步驟只是第6步,或者說是咱們真正要處理IO事件的一些邏輯,其餘的都是一些通用流程而已。spring

既然如此,咱們真的有必要把時間花費在這些通用的地方嗎?bootstrap

偷懶的程序員確定不想這樣作,因此有人開發了minanetty一類的NIO框架,旨在把程序員從這些煩雜的通用流程中釋放出來,而是隻關注真正的業務邏輯,把這些交由框架去作處理。服務器

minanetty的做者都是同一我的(Trustin Lee,牛人老是各類牛)。
006ARE9vgy1ftcrabdjzwj303b040jrk.jpgmybatis

但鑑於netty基本上已是事實上的NIO標準框架了,而且社區一直比較活躍,而mina已經歸檔好久了,都已經沒更新不少年了。爲了不精力太過度散(實際上是我沒學習過mina,不懂-_- ),咱們這裏不討論mina,直接學習netty,裏面有不少值得咱們學習的東西。多線程

前置知識

線程模型

在開始介紹netty相關的知識前,咱們來了解一下線程模型相關的一些知識,這裏參考了不少網上的一些文章,加以本身整理了一下,但願可以給一些看其餘文章不清楚的朋友一些不同的理解。架構

單線程模型

單線程.png

圖片來自: https://www.jianshu.com/p/738...

這裏的單線程指的是分派線程和工做線程都在同一個線程,能夠看回咱們的JAVANIO示例代碼,這裏爲了方便,咱們也貼在下面:框架

public class MyServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8001));

        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        String str = "";
        while(!Thread.currentThread().isInterrupted()) {
            //這裏是一直阻塞,直到有描述符就緒
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while(keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                //鏈接創建
                if (key.isAcceptable()) {
                    try {
                        SocketChannel clientChannel = serverSocketChannel.accept();
                        clientChannel.configureBlocking(false);
                        clientChannel.register(selector, SelectionKey.OP_READ);
                    } catch (ClosedChannelException e) {
                        e.printStackTrace();
                    }
                }
                //鏈接可讀,這時能夠直接讀
                else if (key.isReadable()) {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);

                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    try {
                        int num = socketChannel.read(readBuffer);
                        str = new String(readBuffer.array(), 0, num);
                        System.out.println("received message:" + str);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

}

咱們能夠看到在咱們nio的例子中,咱們沒有明確使用多線程,這裏就是使用了單線程來處理的。
它有什麼好處呢?

實現簡單。這是固然的,全部不涉及到多線程的代碼都是相對比較簡單的,注意,是 相對

有優勢的同時確定有缺點,那麼這種單線程有什麼缺點呢:

性能相對比較差。只有一個線程進行請求的處理,也就是隻有一個線程處理 CPU的描述符,假設同一時間有不少信號都就緒了,而且咱們讀到 IO數據後的真正處理邏輯可能比較複雜,那麼全部的請求都須要等待當前的請求處理完成後才能處理其餘的。這也就致使了它的性能相對(這裏的相對是對比其餘多線程的處理方式)比較弱。

多線程模型

多線程.png

圖片來自: https://www.jianshu.com/p/738...

這裏的多線程指的是處理邏輯的多線程,對應到咱們的NIO代碼邏輯裏面就是對SelectionKey的處理是多線程的,咱們直接看代碼會直觀點:

public class MyServerMultipleThread {

    @SuppressWarnings("Duplicates")
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8001));

        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while(!Thread.currentThread().isInterrupted()) {
            //這裏是一直阻塞,直到有描述符就緒
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while(keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                //鏈接創建
                if (key.isAcceptable()) {
                    try {
                        SocketChannel clientChannel = serverSocketChannel.accept();
                        clientChannel.configureBlocking(false);
                        clientChannel.register(selector, SelectionKey.OP_READ);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                //鏈接可讀,這時能夠直接讀
                else if (key.isReadable()) {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    int num = socketChannel.read(readBuffer);
                    new Thread(() -> {
                        String str = new String(readBuffer.array(), 0, num);
                        System.out.println("received message:" + str);
                    }).start();
                }
            }
        }
    }

}

這裏咱們能夠看到,在進行SelectionKey遍歷讀完數據後真正處理的時候,咱們新起了一個新的線程進行NIO的相關處理。

固然,這裏的只是一個示例,真正寫代碼的時候不該該這樣無限制的新起線程,而是應該使用線程池,更合理的使用線程,避免線程數量太多,致使 CPU切換太頻繁,這樣反而起不到優化性能的做用。

主從多線程模型

主從多線程.png

圖片來自: https://www.jianshu.com/p/738...

一看到這圖,估計不少人頭都大了,這都什麼鬼,這麼複雜啊。
實際能夠簡單一點理解:

  • 原來的接收請求是單線程,如今變成了多線程(線程池)
  • 原來的處理邏輯是單線程,如今是使用多線程(線程池)
注意,這裏的多線程不包括 accept請求, accept仍是由單個線程進行分發。

咱們直接看一下代碼會比較容易理解

public class MyServerMultipleThread2 {

    @SuppressWarnings("Duplicates")
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8001));
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while(!Thread.currentThread().isInterrupted()) {
            selector.select();
            //這裏是一直阻塞,直到有描述符就緒
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while(keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                //鏈接創建
                if (key.isAcceptable()) {
                    try {
                        SocketChannel clientChannel = serverSocketChannel.accept();
                        clientChannel.configureBlocking(false);
                        clientChannel.register(selector, SelectionKey.OP_READ);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                //鏈接可讀,這時能夠直接讀
                else if (key.isReadable()) {
                    new Thread(() -> {
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        int[] num = new int[]{0};
                        try {
                            num[0] = socketChannel.read(readBuffer);
                            new Thread(() -> {
                                String str = new String(readBuffer.array(), 0, num[0]);
                                System.out.println("received message:" + str);
                            }).start();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }).start();
                    key.channel().register(key.selector(), SelectionKey.OP_WRITE);
                }
            }
        }
    }
}

這裏我沒有參考一些網上比較複雜的作法,可能實現起來不大一致,但相對容易理解一點。

  1. 咱們在接受到accept請求時,仍是由單前線程單獨處理
  2. READWRITE等請求時,咱們新起一個線程去作處理,而且在真正的處理邏輯時,仍是跟上面的多線程邏輯同樣,是新起一個線程去作處理。
  3. 咱們在處理READWRITE時,注意,須要從新註冊相應的WRITEREAD事件——由於新起線程後,當前SelectionKey的信號仍是READ,若是咱們不作修改,會致使當前的線程會重複屢次處理。具體你們能夠下來試試,把後面的register去掉,看一下會出現什麼狀況。

咱們看到,上面的線程模型,都以性能提高爲目的,一步步去進行優化,但同時咱們也看到了,代碼是愈來愈複雜,使得咱們在維護咱們真正的邏輯時,有點像是大海撈針,真正的代碼邏輯就那麼一點,而不少都是一些模板代碼。

爲了解決這些問題,就須要引出咱們的框架了,框架正是爲了幫咱們去約定好一些通用的邏輯而出現的,好比 spring,幫我作好了 IOCAOP等的一些邏輯,這些不須要咱們去額外關注;而 mybatis幫咱們作好了 ORM相關的一些處理, DB映射等,這些流程化的東西都已經固化了;而咱們這裏要說的 netty,它幫咱們把 NIO這些線程模型相關的東西幫咱們作了不少的優化和抽取,咱們再也不須要管這些流程化的東西,只須要寫咱們本身的邏輯。

netty出場

netty做爲一個高性能的NIO框架,基本上已是事實上的NIO標準了,包括dubbozookeeper等內部都比較大量地使用了netty。或者說具體點,這些框架可以有這麼好的性能,大部分功勞要歸結到netty身上。

netty基礎知識

看例子前咱們先來補充一些基礎知識。
netty有幾個重要概念:

  • ChannelHandler
channel的事件處理器,裏面封裝了針對當前 channel的生命週期的方法
  • ChannelInBoundHandler
channelREAD請求處理器,裏面封裝了當前 channel的對於接收請求相關的生命週期方法
  • ChannelOutBoundHandler
channelWRITE請求處理器,裏面封裝了當前 channel的對象發出請求的生命週期方法。
  • ChannelPipeline
此類是 netty架構中比較重要的一個類,它使用了 責任鏈模式,把請求從 ChannelHandler中一個個的日後傳遞,最終到達咱們的業務 Handler。關於 Pipeline的詳細描述,咱們後面再詳細看看。
  • ByteBuf
netty封裝了本身的 ByteBuf,與 JDK自帶的 ByteBuffer的最主要的區別是它有兩個指針,一個供讀 readerIndex,一個供寫 writerIndex。而至於該類的一些詳細信息,你們能夠看一下它的 JavaDoc,寫得很是詳細。

關於OutBound和上面的InBound的區別,你們能夠簡單地區分一下,In就是請求進入,對應的就是READOut就是請求發出,對應的就是WRITE

基本的概念瞭解清楚了,那咱們來看一下簡單的例子。
其實netty最好的文檔是它的官網文檔。咱們就仍是以相似官方源碼裏面的一個example來學習一下,實現的功能很簡單:

Client鏈接成功後傳一句話給 ServerServer回覆收到。

實戰

server

ServerHandler
public class MyNettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from client:" + msg);
        ctx.writeAndFlush("I received your message:" + msg + System.lineSeparator());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}
咱們的代碼比較簡單,打印收到的文本,而且再發回一條語句。咱們能夠看到咱們輸出的時候加多了一個 換行符—— System.lineSeparator(),這是爲何呢?
這裏涉及到另一個 TCP/IP一個比較重要的問題, 拆包和粘包,這裏咱們先不細說,後面我會有專門的文章來講一下 拆包和粘包還有一系列 TCP/IP相關的知識,這是很是大的一塊了。咱們如今就先簡單的知道,加這個 換行符是爲了讓 Handler知道咱們的消息從哪裏結束。
Server
public class MyNettyServer {

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LineBasedFrameDecoder(4096));
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new MyNettyServerHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        ChannelFuture channelFuture = null;
        try {
            channelFuture = serverBootstrap.bind("127.0.0.1", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

這裏涉及到比較多的知識點,總體結構咱們先無論它,咱們重要先關注一下:

  • 線程池
這裏定義了兩個線程組進行處理, BossGroupWorkerGroup,對應咱們上面的 多線程模型,緣由是 netty並不使用 主從多線程模型——這個咱們之後的文章有機會再細說。
  • ServerBootStrap
netty工具類,有助於編寫服務器的相關代碼,而 Client端對應的就是 Bootstrap了。
  • pipeline的添加
ch.pipeline().addLast(new LineBasedFrameDecoder(4096));                 ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new MyNettyServerHandler());

這裏把4個Handler添加到Pipeline的末尾,至於爲何是末尾,相應看到後面的pipeline的解析的時候你們就會知道了。
我這裏大概描述一下幾個Handler的做用:

  • LineBasedFrameDecoder
根據換行符 \n\r\n進行內容的分割——即 拆包
  • StringDecoder
把接收到的內容解析爲 String字符串
  • StringEncoder
把發出的內容解析爲 String字符串
  • MyNettyServerHandler
咱們的真正邏輯處理類,這個應該是在前面的幾個處理完成後再進行。咱們在後面的 pipeline執行順序中能夠看到爲何這樣添加。
後面的 Client中的 Handler也能夠參考上面的。

Client

ClientHandler
public class MyNettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("helloworld" + System.lineSeparator());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from server:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}

這裏咱們的代碼也比較簡單,就是鏈接成功的時候發條helloworld過去服務端,而後再從服務端讀到返回的內容。咱們就不細說了。

Client
public class MyNettyClient {

    public static void main(String[] args) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LineBasedFrameDecoder(4096));
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new MyNettyClientHandler());
                    }
                })
                .option(ChannelOption.SO_KEEPALIVE, true);

        try {
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}

對比上面的Server代碼,這裏的區別,最大的就是咱們只有一個EventLoopGroup,由於Client端並不須要接收請求,因此並不須要所謂的BossGroup

一切就緒後,咱們能夠跑一下看看運行狀況:
先運行server,再運行client
server能夠看到
Jietu20200130-192501@2x.jpg
client能夠看到
Jietu20200130-192447@2x.jpg
這表示咱們已經使用netty寫了一個基本能夠用的NIO程序了。
Jietu20200130-191258@2x.jpg

ChannelPipeline詳解

ChannelPipeline做爲netty的一個底層重要組成部分,ChannelHandler都須要依靠它進行調度,重要性不言而喻。那咱們如今就一塊兒來看看ChannelPipeline到底是怎麼調度的。
查看ChannelPipelineJavaDoc咱們能夠看到這樣一串描述(牛人寫描述都是特別認真的)。
Jietu20200130-191826@2x.jpg
大概的意思就是這樣的:

  1. 請求進來時,按照InBoundHandler的添加順序,從前日後執行。
  2. 請求出去時,按照OutBoundHandler的添加順序,從後往前執行。

另外,文檔中又舉了一個例子:
Jietu20200130-192431@2x.jpg
咱們套用一下咱們的Server例子來分析一下:
LineBasedFrameDecoder,StringDecoder,StringEncoder,MyNettyServerHandler

當咱們收到消息時,須要執行的 Handler的順序爲: LineBasedFrameDecoder, StringDecoder, MyNettyServerHandler
當咱們發出消息時,須要執行的 OutboundHandler的順序爲: StringEncoder.

基於上面的分析,咱們就能夠分析爲何咱們前面的例子能夠獲得那樣的結果。

總結

這篇文章,咱們從一開始的線程模型到後面的netty的示例,這些種種都是爲了性能的提升去作的一些優化。在當前大數據的趨勢下,更多須要咱們把性能去作到極致。
後面,咱們會再根據netty中的一些最佳實踐來分析它是怎麼解析粘包和拆分的。

參考文章

https://www.jianshu.com/p/738095702b75
https://netty.io/wiki/user-guide-for-4.x.html

相關文章
相關標籤/搜索