NIO
的那些事咱們在前段時間學習了IO
和NIO
的一些概念性的東西,而且寫了一些簡單的例子進行實踐,雖然簡單,但基本上覆蓋了NIO
的一些最基本的概念了。
若是還沒看過的,若是翻一下以前的文章瞭解一下,或者看一下網上的其餘文章。html
JAVA
的NIO
的那些痛既然咱們學過NIO
,那咱們以JAVA
的NIO
來舉個例子,說明一下咱們使用NIO
的一些基本流程:java
ServerSocketChannel
(Server
端)或SocketChannel
(Client
端),監聽對應的端口或鏈接對應的端口configureBlocking(false)
爲非阻塞register
註冊要監聽的描述符Selector.open
打開Selector
Selector.select
獲得已就緒的SelectionKey
SelectionKey
進行相應的處理這裏我把以前的某些步驟合併了,可能跟以前有前面的文章有點不一致,但整體步驟是同樣的。程序員
其實,上面的步驟咱們大能夠了解到,咱們真正須要關注的步驟只是第6步,或者說是咱們真正要處理IO
事件的一些邏輯,其餘的都是一些通用流程而已。spring
既然如此,咱們真的有必要把時間花費在這些通用的地方嗎?bootstrap
偷懶的程序員確定不想這樣作,因此有人開發了mina
和netty
一類的NIO
框架,旨在把程序員從這些煩雜的通用流程中釋放出來,而是隻關注真正的業務邏輯,把這些交由框架去作處理。服務器
mina
和netty
的做者都是同一我的(Trustin Lee,牛人老是各類牛)。
mybatis
但鑑於netty
基本上已是事實上的NIO
標準框架了,而且社區一直比較活躍,而mina
已經歸檔好久了,都已經沒更新不少年了。爲了不精力太過度散(實際上是我沒學習過mina
,不懂-_- ),咱們這裏不討論mina
,直接學習netty
,裏面有不少值得咱們學習的東西。多線程
在開始介紹netty
相關的知識前,咱們來了解一下線程模型相關的一些知識,這裏參考了不少網上的一些文章,加以本身整理了一下,但願可以給一些看其餘文章不清楚的朋友一些不同的理解。架構
圖片來自: https://www.jianshu.com/p/738...
這裏的單線程指的是分派線程和工做線程都在同一個線程,能夠看回咱們的JAVA
的NIO
示例代碼,這裏爲了方便,咱們也貼在下面:框架
public class MyServer { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress("localhost", 8001)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); String str = ""; while(!Thread.currentThread().isInterrupted()) { //這裏是一直阻塞,直到有描述符就緒 selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); //鏈接創建 if (key.isAcceptable()) { try { SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } //鏈接可讀,這時能夠直接讀 else if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); try { int num = socketChannel.read(readBuffer); str = new String(readBuffer.array(), 0, num); System.out.println("received message:" + str); } catch (IOException e) { e.printStackTrace(); } } } } } }
咱們能夠看到在咱們nio
的例子中,咱們沒有明確使用多線程,這裏就是使用了單線程來處理的。
它有什麼好處呢?
實現簡單。這是固然的,全部不涉及到多線程的代碼都是相對比較簡單的,注意,是 相對
有優勢的同時確定有缺點,那麼這種單線程有什麼缺點呢:
性能相對比較差。只有一個線程進行請求的處理,也就是隻有一個線程處理
CPU的描述符,假設同一時間有不少信號都就緒了,而且咱們讀到
IO
數據後的真正處理邏輯可能比較複雜,那麼全部的請求都須要等待當前的請求處理完成後才能處理其餘的。這也就致使了它的性能相對(這裏的相對是對比其餘多線程的處理方式)比較弱。
圖片來自: https://www.jianshu.com/p/738...
這裏的多線程指的是處理邏輯的多線程,對應到咱們的NIO
代碼邏輯裏面就是對SelectionKey
的處理是多線程的,咱們直接看代碼會直觀點:
public class MyServerMultipleThread { @SuppressWarnings("Duplicates") public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress("localhost", 8001)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while(!Thread.currentThread().isInterrupted()) { //這裏是一直阻塞,直到有描述符就緒 selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); //鏈接創建 if (key.isAcceptable()) { try { SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } //鏈接可讀,這時能夠直接讀 else if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); int num = socketChannel.read(readBuffer); new Thread(() -> { String str = new String(readBuffer.array(), 0, num); System.out.println("received message:" + str); }).start(); } } } } }
這裏咱們能夠看到,在進行SelectionKey
遍歷讀完數據後真正處理的時候,咱們新起了一個新的線程進行NIO
的相關處理。
固然,這裏的只是一個示例,真正寫代碼的時候不該該這樣無限制的新起線程,而是應該使用線程池,更合理的使用線程,避免線程數量太多,致使 CPU切換太頻繁,這樣反而起不到優化性能的做用。
圖片來自: https://www.jianshu.com/p/738...
一看到這圖,估計不少人頭都大了,這都什麼鬼,這麼複雜啊。
實際能夠簡單一點理解:
注意,這裏的多線程不包括accept
請求,accept
仍是由單個線程進行分發。
咱們直接看一下代碼會比較容易理解
public class MyServerMultipleThread2 { @SuppressWarnings("Duplicates") public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress("localhost", 8001)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while(!Thread.currentThread().isInterrupted()) { selector.select(); //這裏是一直阻塞,直到有描述符就緒 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); //鏈接創建 if (key.isAcceptable()) { try { SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } //鏈接可讀,這時能夠直接讀 else if (key.isReadable()) { new Thread(() -> { ByteBuffer readBuffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); int[] num = new int[]{0}; try { num[0] = socketChannel.read(readBuffer); new Thread(() -> { String str = new String(readBuffer.array(), 0, num[0]); System.out.println("received message:" + str); }).start(); } catch (IOException e) { e.printStackTrace(); } }).start(); key.channel().register(key.selector(), SelectionKey.OP_WRITE); } } } } }
這裏我沒有參考一些網上比較複雜的作法,可能實現起來不大一致,但相對容易理解一點。
accept
請求時,仍是由單前線程單獨處理READ
或WRITE
等請求時,咱們新起一個線程去作處理,而且在真正的處理邏輯時,仍是跟上面的多線程邏輯同樣,是新起一個線程去作處理。READ
或WRITE
時,注意,須要從新註冊相應的WRITE
或READ
事件——由於新起線程後,當前SelectionKey
的信號仍是READ
,若是咱們不作修改,會致使當前的線程會重複屢次處理。具體你們能夠下來試試,把後面的register
去掉,看一下會出現什麼狀況。咱們看到,上面的線程模型,都以性能提高爲目的,一步步去進行優化,但同時咱們也看到了,代碼是愈來愈複雜,使得咱們在維護咱們真正的邏輯時,有點像是大海撈針,真正的代碼邏輯就那麼一點,而不少都是一些模板代碼。
爲了解決這些問題,就須要引出咱們的框架了,框架正是爲了幫咱們去約定好一些通用的邏輯而出現的,好比spring
,幫我作好了IOC
和AOP
等的一些邏輯,這些不須要咱們去額外關注;而mybatis
幫咱們作好了ORM
相關的一些處理, DB映射等,這些流程化的東西都已經固化了;而咱們這裏要說的netty
,它幫咱們把NIO
這些線程模型相關的東西幫咱們作了不少的優化和抽取,咱們再也不須要管這些流程化的東西,只須要寫咱們本身的邏輯。
netty
出場netty
做爲一個高性能的NIO
框架,基本上已是事實上的NIO
標準了,包括dubbo
,zookeeper
等內部都比較大量地使用了netty
。或者說具體點,這些框架可以有這麼好的性能,大部分功勞要歸結到netty
身上。
netty
基礎知識看例子前咱們先來補充一些基礎知識。netty
有幾個重要概念:
ChannelHandler
channel
的事件處理器,裏面封裝了針對當前channel
的生命週期的方法
ChannelInBoundHandler
channel
的READ
請求處理器,裏面封裝了當前channel
的對於接收請求相關的生命週期方法
ChannelOutBoundHandler
channel
的WRITE
請求處理器,裏面封裝了當前channel
的對象發出請求的生命週期方法。
ChannelPipeline
此類是netty
架構中比較重要的一個類,它使用了 責任鏈模式,把請求從ChannelHandler
中一個個的日後傳遞,最終到達咱們的業務Handler
。關於Pipeline
的詳細描述,咱們後面再詳細看看。
ByteBuf
netty
封裝了本身的ByteBuf
,與JDK
自帶的ByteBuffer
的最主要的區別是它有兩個指針,一個供讀readerIndex
,一個供寫writerIndex
。而至於該類的一些詳細信息,你們能夠看一下它的JavaDoc
,寫得很是詳細。
關於OutBound
和上面的InBound
的區別,你們能夠簡單地區分一下,In
就是請求進入,對應的就是READ
,Out
就是請求發出,對應的就是WRITE
。
基本的概念瞭解清楚了,那咱們來看一下簡單的例子。
其實netty
最好的文檔是它的官網文檔。咱們就仍是以相似官方源碼裏面的一個example
來學習一下,實現的功能很簡單:
Client
鏈接成功後傳一句話給Server
,Server
回覆收到。
server
端ServerHandler
public class MyNettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("from client:" + msg); ctx.writeAndFlush("I received your message:" + msg + System.lineSeparator()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } }
咱們的代碼比較簡單,打印收到的文本,而且再發回一條語句。咱們能夠看到咱們輸出的時候加多了一個 換行符——System.lineSeparator()
,這是爲何呢?
這裏涉及到另一個TCP/IP
一個比較重要的問題, 拆包和粘包,這裏咱們先不細說,後面我會有專門的文章來講一下 拆包和粘包還有一系列TCP/IP
相關的知識,這是很是大的一塊了。咱們如今就先簡單的知道,加這個 換行符是爲了讓 Handler知道咱們的消息從哪裏結束。
Server
public class MyNettyServer { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(4096)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new MyNettyServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture channelFuture = null; try { channelFuture = serverBootstrap.bind("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
這裏涉及到比較多的知識點,總體結構咱們先無論它,咱們重要先關注一下:
線程池
這裏定義了兩個線程組進行處理,BossGroup
和WorkerGroup
,對應咱們上面的 多線程模型,緣由是netty
並不使用 主從多線程模型——這個咱們之後的文章有機會再細說。
ServerBootStrap
netty
工具類,有助於編寫服務器的相關代碼,而Client
端對應的就是Bootstrap
了。
pipeline
的添加ch.pipeline().addLast(new LineBasedFrameDecoder(4096)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new MyNettyServerHandler());
這裏把4個Handler
添加到Pipeline
的末尾,至於爲何是末尾,相應看到後面的pipeline
的解析的時候你們就會知道了。
我這裏大概描述一下幾個Handler
的做用:
LineBasedFrameDecoder
根據換行符\n
或\r\n
進行內容的分割——即 拆包
StringDecoder
把接收到的內容解析爲
String
字符串
StringEncoder
把發出的內容解析爲
String
字符串
MyNettyServerHandler
咱們的真正邏輯處理類,這個應該是在前面的幾個處理完成後再進行。咱們在後面的pipeline
執行順序中能夠看到爲何這樣添加。
後面的Client
中的Handler
也能夠參考上面的。
Client
端ClientHandler
public class MyNettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush("helloworld" + System.lineSeparator()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("from server:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } }
這裏咱們的代碼也比較簡單,就是鏈接成功的時候發條helloworld
過去服務端,而後再從服務端讀到返回的內容。咱們就不細說了。
public class MyNettyClient { public static void main(String[] args) { EventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(4096)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new MyNettyClientHandler()); } }) .option(ChannelOption.SO_KEEPALIVE, true); try { ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } }
對比上面的Server
代碼,這裏的區別,最大的就是咱們只有一個EventLoopGroup
,由於Client
端並不須要接收請求,因此並不須要所謂的BossGroup
。
一切就緒後,咱們能夠跑一下看看運行狀況:
先運行server
,再運行client
server
能夠看到client
能夠看到
這表示咱們已經使用netty
寫了一個基本能夠用的NIO
程序了。
ChannelPipeline
詳解ChannelPipeline
做爲netty
的一個底層重要組成部分,ChannelHandler
都須要依靠它進行調度,重要性不言而喻。那咱們如今就一塊兒來看看ChannelPipeline
到底是怎麼調度的。
查看ChannelPipeline
的JavaDoc
咱們能夠看到這樣一串描述(牛人寫描述都是特別認真的)。
大概的意思就是這樣的:
InBoundHandler
的添加順序,從前日後執行。OutBoundHandler
的添加順序,從後往前執行。另外,文檔中又舉了一個例子:
咱們套用一下咱們的Server
例子來分析一下:LineBasedFrameDecoder
,StringDecoder
,StringEncoder
,MyNettyServerHandler
當咱們收到消息時,須要執行的Handler
的順序爲:LineBasedFrameDecoder
,StringDecoder
,MyNettyServerHandler
當咱們發出消息時,須要執行的OutboundHandler
的順序爲:StringEncoder
.
基於上面的分析,咱們就能夠分析爲何咱們前面的例子能夠獲得那樣的結果。
這篇文章,咱們從一開始的線程模型到後面的netty
的示例,這些種種都是爲了性能的提升去作的一些優化。在當前大數據的趨勢下,更多須要咱們把性能去作到極致。
後面,咱們會再根據netty
中的一些最佳實踐來分析它是怎麼解析粘包和拆分的。
https://www.jianshu.com/p/738095702b75
https://netty.io/wiki/user-guide-for-4.x.html