高清思惟導圖原件(xmind/pdf/jpg
)能夠關注公衆號:一枝花算不算浪漫
回覆netty01
便可。java
前言
上一篇文章講了NIO
相關的知識點,相比於傳統IO
,NIO
已經作得很優雅了,爲何咱們還要使用Netty
?ios
上篇文章最後留了不少坑,講了NIO
使用的弊端,也是爲了引出Netty
而設立的,這篇文章咱們就來好好揭開Netty
的神祕面紗。編程
本篇文章的目的很簡單,但願看事後你能看懂Netty
的示例代碼,針對於簡單的網絡通訊,本身也能用Netty
手寫一個開發應用出來!bootstrap
一個簡單的Netty示例
如下是一個簡單聊天室Server端的程序,代碼參考自:http://www.imooc.com/read/82/article/2166
設計模式
代碼有點長,主要核心代碼是在main()
方法中,這裏代碼也但願你們看懂,後面也會一步步剖析。緩存
PS:我是用mac
系統,直接在終端輸入telnet 127.0.0.1 8007
便可啓動一個聊天框,若是提示找不到telnet
命令,能夠經過brew
進行安裝,具體步驟請自行百度。安全
/** * @Description netty簡易聊天室 * * @Author 一枝花算不算浪漫 * @Date 2020/8/10 6:52 上午 */ public final class NettyChatServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // 1. EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 2. 服務端引導器 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 3. 設置線bootStrap信息 serverBootstrap.group(bossGroup, workerGroup) // 4. 設置ServerSocketChannel的類型 .channel(NioServerSocketChannel.class) // 5. 設置參數 .option(ChannelOption.SO_BACKLOG, 100) // 6. 設置ServerSocketChannel對應的Handler,只能設置一個 .handler(new LoggingHandler(LogLevel.INFO)) // 7. 設置SocketChannel對應的Handler .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // 能夠添加多個子Handler p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } }); // 8. 綁定端口 ChannelFuture f = serverBootstrap.bind(PORT).sync(); // 9. 等待服務端監聽端口關閉,這裏會阻塞主線程 f.channel().closeFuture().sync(); } finally { // 10. 優雅地關閉兩個線程池 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private static class ChatNettyHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("one conn active: " + ctx.channel()); // channel是在ServerBootstrapAcceptor中放到EventLoopGroup中的 ChatHolder.join((SocketChannel) ctx.channel()); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String content = new String(bytes, StandardCharsets.UTF_8); System.out.println(content); if (content.equals("quit\r\n")) { ctx.channel().close(); } else { ChatHolder.propagate((SocketChannel) ctx.channel(), content); } } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("one conn inactive: " + ctx.channel()); ChatHolder.quit((SocketChannel) ctx.channel()); } } private static class ChatHolder { static final Map<SocketChannel, String> USER_MAP = new ConcurrentHashMap<>(); /** * 加入羣聊 */ static void join(SocketChannel socketChannel) { // 有人加入就給他分配一個id String userId = "用戶"+ ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); send(socketChannel, "您的id爲:" + userId + "\n\r"); for (SocketChannel channel : USER_MAP.keySet()) { send(channel, userId + " 加入了羣聊" + "\n\r"); } // 將當前用戶加入到map中 USER_MAP.put(socketChannel, userId); } /** * 退出羣聊 */ static void quit(SocketChannel socketChannel) { String userId = USER_MAP.get(socketChannel); send(socketChannel, "您退出了羣聊" + "\n\r"); USER_MAP.remove(socketChannel); for (SocketChannel channel : USER_MAP.keySet()) { if (channel != socketChannel) { send(channel, userId + " 退出了羣聊" + "\n\r"); } } } /** * 擴散說話的內容 */ public static void propagate(SocketChannel socketChannel, String content) { String userId = USER_MAP.get(socketChannel); for (SocketChannel channel : USER_MAP.keySet()) { if (channel != socketChannel) { send(channel, userId + ": " + content); } } } /** * 發送消息 */ static void send(SocketChannel socketChannel, String msg) { try { ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; ByteBuf writeBuffer = allocator.buffer(msg.getBytes().length); writeBuffer.writeCharSequence(msg, Charset.defaultCharset()); socketChannel.writeAndFlush(writeBuffer); } catch (Exception e) { e.printStackTrace(); } } } }
代碼有點長,執行完的效果如上圖所示,下面全部內容都是圍繞着如何看懂
以及如何寫出
這樣的代碼來展開的,但願你看完 也能輕鬆手寫Netty
服務端代碼~。經過簡單demo開發讓你們體驗了Netty
實現相比NIO
確實要簡單的多,但優勢不限於此,只須要知道選擇Netty就對了。服務器
Netty核心組件
對應着文章開頭的思惟導圖,咱們知道Netty
的核心組件主要有:網絡
- Bootstrap && ServerBootstrap
- EventLoopGroup
- EventLoop
- ByteBuf
- Channel
- ChannelHandler
- ChannelFuture
- ChannelPipeline
- ChannelHandlerContext
類圖以下:架構
Bootstrap & ServerBootstrap
一看到BootStrap
你們就應該想到啓動類、引導類這樣的詞彙,以前分析過EurekaServer項目啓動類時介紹過EurekaBootstrap
, 他的做用就是上下文初始化、配置初始化。
在Netty
中咱們也有相似的類,Bootstrap
和ServerBootstrap
它們都是Netty
程序的引導類,主要用於配置各類參數,並啓動整個Netty
服務,咱們看下文章開頭的示例代碼:
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } });
Bootstrap
和ServerBootstrap
是針對於Client
和Server
端定義的兩套啓動類,區別以下:
Bootstrap
是客戶端引導類,而ServerBootstrap
是服務端引導類。Bootstrap
一般使用connect()
方法鏈接到遠程的主機和端口,做爲一個TCP客戶端
。ServerBootstrap
一般使用bind()
方法綁定本地的端口,等待客戶端來鏈接。ServerBootstrap
能夠處理Accept
事件,這裏面childHandler
是用來處理Channel
請求的,咱們能夠查看chaildHandler()
方法的註解:
Bootstrap
客戶端引導只須要一個EventLoopGroup
,可是一個ServerBootstrap
一般須要兩個(上面的boosGroup
和workerGroup
)。
EventLoopGroup && EventLoop
EventLoopGroup
及EventLoop
這兩個類名稱定義的很奇怪,對於初學者來講每每沒法經過名稱來了解其中的含義,包括我也是這樣。
EventLoopGroup
能夠理解爲一個線程池,對於服務端程序,咱們通常會綁定兩個線程池,一個用於處理 Accept
事件,一個用於處理讀寫事件,看下EventLoop
系列的類目錄:
經過上面的類圖,咱們才恍然大悟,個人親孃咧,這不就是一個線程池嘛?(名字氣的犄角拐彎的真是難認)
EventLoopGroup
是EventLoop
的集合,一個EventLoopGroup
包含一個或者多個EventLoop
。咱們能夠將EventLoop
看作EventLoopGroup
線程池中的一個個工做線程。
至於這裏爲何要用到兩個線程池,具體的其實能夠參考Reactor
設計模式,這裏暫時不作過多的講解。
- 一個 EventLoopGroup 包含一個或多個 EventLoop ,即 EventLoopGroup : EventLoop = 1 : n
- 一個 EventLoop 在它的生命週期內,只能與一個 Thread 綁定,即 EventLoop : Thread = 1 : 1
- 全部有 EventLoop 處理的 I/O 事件都將在它專有的 Thread 上被處理,從而保證線程安全,即 Thread : EventLoop = 1 : 1
- 一個 Channel 在它的生命週期內只能註冊到一個 EventLoop 上,即 Channel : EventLoop = n : 1
- 一個 EventLoop 可被分配至一個或多個 Channel ,即 EventLoop : Channel = 1 : n
當一個鏈接到達時,Netty
就會建立一個 Channel
,而後從 EventLoopGroup
中分配一個 EventLoop
來給這個 Channel
綁定上,在該 Channel
的整個生命週期中都是有這個綁定的 EventLoop
來服務的。
ByteBuf
在Java NIO
中咱們有 ByteBuffer
緩衝池,對於它的操做咱們應該印象深入,往Buffer
中寫數據時咱們須要關注寫入的位置,切換成讀模式時咱們還要切換讀寫狀態,否則將會出現大問題。
針對於NIO
中超級難用的Buffer
類, Netty
提供了ByteBuf
來替代。ByteBuf
聲明瞭兩個指針:一個讀指針,一個寫指針,使得讀寫操做進行分離,簡化buffer
的操做流程。
另外Netty
提供了發幾種ByteBuf
的實現以供咱們選擇,ByteBuf
能夠分爲:
Pooled
和Unpooled
池化和非池化- Heap 和 Direct,堆內存和堆外內存,NIO中建立Buffer也能夠指定
- Safe 和 Unsafe,安全和非安全
對於這麼多種建立Buffer
的方式該怎麼選擇呢?Netty
也爲咱們處理好了,咱們能夠直接使用(真是暖男Ntetty
):
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; ByteBuf buffer = allocator.buffer(length);
使用這種方式,Netty將最大努力的使用池化、Unsafe、對外內存的方式爲咱們建立buffer。
Channel
提起Channel
並不陌生,上一篇講NIO
的三大組件提到過,最多見的就是java.nio.SocketChannel
和java.nio.ServerSocketChannel
,他們用於非阻塞的I/0操做。相似於NIO
的Channel
,Netty提供了本身的Channel
和其子類實現,用於異步I/0操做和其餘相關的操做。
在 Netty
中, Channel
是一個 Socket
鏈接的抽象, 它爲用戶提供了關於底層 Socket
狀態(是不是鏈接仍是斷開) 以及對 Socket
的讀寫等操做。每當 Netty
創建了一個鏈接後, 都會有一個對應的 Channel
實例。而且,有父子channel
的概念。 服務器鏈接監聽的channel
,也叫 parent channel
。 對應於每個 Socket
鏈接的channel
,也叫 child channel
。
既然channel
是 Netty 抽象出來的網絡 I/O 讀寫相關的接口,爲何不使用 JDK NIO
原生的 Channel
而要另起爐竈呢,主要緣由以下:
JDK
的SocketChannel
和ServersocketChannel
沒有統一的Channel
接口供業務開發者使用,對一於用戶而言,沒有統一的操做視圖,使用起來並不方便。JDK
的SocketChannel
和ScrversockctChannel
的主要職責就是網絡 I/O 操做,因爲他們是SPI
類接口,由具體的虛擬機廠家來提供,因此經過繼承 SPI 功能直接實現ServersocketChannel
和SocketChannel
來擴展其工做量和從新Channel
功類是差很少的。- Netty 的
ChannelPipeline Channel
須要夠跟 Netty 的總體架構融合在一塊兒,例如 I/O 模型、基的定製模型,以及基於元數據描述配置化的 TCP 參數等,這些JDK SocketChannel
和ServersocketChannel
都沒有提供,須要從新封裝。 - 自定義的
Channel
,功實現更加靈活。
基於上述 4 緣由,它的設計原理比較簡單, Netty 從新設計了 Channel
接口,而且給予了不少不一樣的實現。可是功能卻比較繁雜,主要的設計理念以下:
- 在
Channel
接口層,相關聯的其餘操做封裝起來,採用Facade
模式進行統一封裝,將網絡 I/O 操做、網絡 I/O 統一對外提供。 Channel
接口的定義儘可能大而全,統一的視圖,由不一樣子類實現不一樣的功能,公共功能在抽象父類中實現,最大程度上實現接口的重用。- 具體實現採用聚合而非包含的方式,將相關的功類聚合在
Channel
中,由Channel
統一負責分配和調度,功能實現更加靈活。
Channel
的實現類很是多,繼承關係複雜,從學習的角度咱們抽取最重要的兩個 NioServerSocketChannel
和 NioSocketChannel
。
服務端 NioServerSocketChannel
的繼承關係類圖以下:
客戶端 NioSocketChannel
的繼承關係類圖以下:
後面文章源碼系列會具體分析,這裏就不進一步闡述分析了。
ChannelHandler
ChannelHandler
是Netty
中最經常使用的組件。ChannelHandler
主要用來處理各類事件,這裏的事件很普遍,好比能夠是鏈接、數據接收、異常、數據轉換等。
ChannelHandler
有兩個核心子類 ChannelInboundHandler
和 ChannelOutboundHandler
,其中 ChannelInboundHandler
用於接收、處理入站( Inbound
)的數據和事件,而 ChannelOutboundHandler
則相反,用於接收、處理出站( Outbound
)的數據和事件。
ChannelInboundHandler
ChannelInboundHandler
處理入站數據以及各類狀態變化,當Channel
狀態發生改變會調用ChannelInboundHandler
中的一些生命週期方法.這些方法與Channel
的生命密切相關。
入站數據,就是進入socket
的數據。下面展現一些該接口的生命週期API
:
當某個 ChannelInboundHandler
的實現重寫 channelRead()
方法時,它將負責顯式地釋放與池化的 ByteBuf
實例相關的內存。 Netty 爲此提供了一個實用方法ReferenceCountUtil.release()
。
@Sharable public class DiscardHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); } }
這種方式還挺繁瑣的,Netty提供了一個SimpleChannelInboundHandler
,重寫channelRead0()
方法,就能夠在調用過程當中會自動釋放資源.
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { // 不用調用ReferenceCountUtil.release(msg)也會釋放資源 } }
ChannelOutboundHandler
出站操做和數據將由 ChannelOutboundHandler
處理。它的方法將被 Channel
、 ChannelPipeline
以及 ChannelHandlerContext
調用。 ChannelOutboundHandler
的一個強大的功能是能夠按需推遲操做或者事件,這使得能夠經過一些複雜的方法來處理請求。例如, 若是到遠程節點的寫入被暫停了, 那麼你能夠推遲沖刷操做並在稍後繼續。
ChannelPromise
與ChannelFuture
: ChannelOutboundHandler
中的大部分方法都須要一個ChannelPromise
參數, 以便在操做完成時獲得通知。 ChannelPromise
是ChannelFuture
的一個子類,其定義了一些可寫的方法,如setSuccess()
和setFailure()
,從而使ChannelFuture
不可變。
ChannelHandlerAdapter
ChannelHandlerAdapter
顧名思義,就是handler
的適配器。你須要知道什麼是適配器模式,假設有一個A接口,咱們須要A的subclass
實現功能,可是B類中正好有咱們須要的功能,不想複製粘貼B中的方法和屬性了,那麼能夠寫一個適配器類Adpter
繼承B實現A,這樣一來Adapter
是A的子類而且能直接使用B中的方法,這種模式就是適配器模式。
就好比Netty中的SslHandler
類,想使用ByteToMessageDecoder
中的方法進行解碼,可是必須是ChannelHandler
子類對象才能加入到ChannelPipeline
中,經過以下簽名和其實現細節(SslHandler
實現細節就不貼了)就可以做爲一個handler
去處理消息了。
public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler
ChannelHandlerAdapter
提供了一些實用方法isSharable()
若是其對應的實現被標註爲 Sharable
, 那麼這個方法將返回 true
, 表示它能夠被添加到多個 ChannelPipeline
中 。若是想在本身的ChannelHandler
中使用這些適配器類,只須要擴展他們,重寫那些想要自定義的方法便可。
ChannelPipeline
每個新建立的 Channel
都將會被分配一個新的 ChannelPipeline
。這項關聯是永久性的; Channel
既不能附加另一個 ChannelPipeline
,也不能分離其當前的。在 Netty 組件的生命週期中,這是一項固定的操做,不須要開發人員的任何干預。
Netty 的 ChannelHandler
爲處理器提供了基本的抽象, 目前你能夠認爲每一個 ChannelHandler
的實例都相似於一種爲了響應特定事件而被執行的回調。從應用程序開發人員的角度來看, 它充當了全部處理入站和出站數據的應用程序邏輯的攔截載體。ChannelPipeline
提供了 ChannelHandler
鏈的容器,並定義了用於在該鏈上傳播入站和出站事件流的 API
。當 Channel
被建立時,它會被自動地分配到它專屬的 ChannelPipeline
。
ChannelHandler
安裝到 ChannelPipeline
中的過程以下所示:
- 一個
ChannelInitializer
的實現被註冊到了ServerBootstrap
中 - 當
ChannelInitializer.initChannel()
方法被調用時,ChannelInitializer
將在ChannelPipeline
中安裝一組自定義的ChannelHandler
ChannelInitializer
將它本身從ChannelPipeline
中移除
如上圖所示:這是一個同時具備入站和出站 ChannelHandler
的 ChannelPipeline
的佈局,而且印證了咱們以前的關於 ChannelPipeline
主要由一系列的 ChannelHandler
所組成的說法。 ChannelPipeline
還提供了經過 ChannelPipeline
自己傳播事件的方法。若是一個入站事件被觸發,它將被從 ChannelPipeline
的頭部開始一直被傳播到 Channel Pipeline 的尾端。
你可能會說, 從事件途經 ChannelPipeline
的角度來看, ChannelPipeline
的頭部和尾端取決於該事件是入站的仍是出站的。然而 Netty 老是將 ChannelPipeline
的入站口(圖 的左側)做爲頭部,而將出站口(該圖的右側)做爲尾端。 當你完成了經過調用 ChannelPipeline.add*()
方法將入站處理器( ChannelInboundHandler
)和 出 站 處 理 器 ( ChannelOutboundHandler
) 混 合 添 加 到 ChannelPipeline
之 後 , 每 一 個ChannelHandler
從頭部到尾端的順序位置正如同咱們方纔所定義它們的同樣。所以,若是你將圖 6-3 中的處理器( ChannelHandler
)從左到右進行編號,那麼第一個被入站事件看到的 ChannelHandler
將是1,而第一個被出站事件看到的 ChannelHandler
將是 5。
在 ChannelPipeline
傳播事件時,它會測試 ChannelPipeline
中的下一個 Channel Handler 的類型是否和事件的運動方向相匹配。若是不匹配, ChannelPipeline
將跳過該ChannelHandler
並前進到下一個,直到它找到和該事件所指望的方向相匹配的爲止。 (固然, ChannelHandler
也能夠同時實現ChannelInboundHandler
接口和 ChannelOutboundHandler
接口。)
修改ChannelPipeline
修改指的是添加或刪除ChannelHandler
,見代碼示例:
ChannelPipeline pipeline = ..; FirstHandler firstHandler = new FirstHandler(); // 先添加一個Handler到ChannelPipeline中 pipeline.addLast("handler1", firstHandler); // 這個Handler放在了first,意味着放在了handler1以前 pipeline.addFirst("handler2", new SecondHandler()); // 這個Handler被放到了last,意味着在handler1以後 pipeline.addLast("handler3", new ThirdHandler()); ... // 經過名稱刪除 pipeline.remove("handler3"); // 經過對象刪除 pipeline.remove(firstHandler); // 名稱"handler2"替換成名稱"handler4",並切handler2的實例替換成了handler4的實例 pipeline.replace("handler2", "handler4", new ForthHandler());
ChannelPipeline
的出入站API
入站API
所示:
[圖片上傳失敗...(image-6037f5-1598167949595)]
出站API
所示:
ChannelPipeline
這個組件上面所講的大體只須要記住這三點便可:
ChannelPipeline
保存了與Channel
相關聯的ChannelHandler
ChannelPipeline
能夠根據須要,經過添加或者刪除ChannelHandler
來動態地修改ChannelPipeline
有着豐富的API
用以被調用,以響應入站和出站事件
ChannelHandlerContext
當 ChannelHandler
被添加到 ChannelPipeline
時,它將會被分配一個 ChannelHandlerContext
,它表明了 ChannelHandler
和 ChannelPipeline
之間的綁定。ChannelHandlerContext
的主要功能是管理它所關聯的ChannelHandler
和在同一個 ChannelPipeline
中的其餘ChannelHandler
之間的交互。
若是調用Channel
或ChannelPipeline
上的方法,會沿着整個ChannelPipeline
傳播,若是調用ChannelHandlerContext
上的相同方法,則會從對應的當前ChannelHandler
進行傳播。
ChannelHandlerContext API
以下表所示:
ChannelHandlerContext
和ChannelHandler
之間的關聯(綁定)是永遠不會改變的,因此緩存對它的引用是安全的;- 如同在本節開頭所解釋的同樣,相對於其餘類的同名方法,
ChannelHandlerContext
的方法將產生更短的事件流, 應該儘量地利用這個特性來得到最大的性能。
與ChannelHandler
、ChannelPipeline
的關聯使用
從ChannelHandlerContext
訪問channel
ChannelHandlerContext ctx = ..; // 獲取channel引用 Channel channel = ctx.channel(); // 經過channel寫入緩衝區 channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
從ChannelHandlerContext
訪問ChannelPipeline
ChannelHandlerContext ctx = ..; // 獲取ChannelHandlerContext ChannelPipeline pipeline = ctx.pipeline(); // 經過ChannelPipeline寫入緩衝區 pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
有時候咱們不想從頭傳遞數據,想跳過幾個handler
,從某個handler
開始傳遞數據.咱們必須獲取目標handler
以前的handler
關聯的ChannelHandlerContext
。
ChannelHandlerContext ctx = ..; // 直接經過ChannelHandlerContext寫數據,發送到下一個handler ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
好了,ChannelHandlerContext
的基本使用應該掌握了,可是你真的理解ChannelHandlerContext
,ChannelPipeline
和Channelhandler
之間的關係了嗎?不理解也不要緊,由於源碼之後會幫你理解的更爲深入。
核心組件之間的關係
- 一個
Channel
對應一個ChannelPipeline
- 一個
ChannelPipeline
包含一條雙向的ChannelHandlerContext
鏈 - 一個
ChannelHandlerContext
中包含一個ChannelHandler
- 一個
Channel
會綁定到一個EventLoop
上 - 一個
NioEventLoop
維護了一個Selector(
使用的是 Java 原生的 Selector) - 一個
NioEventLoop
至關於一個線程
粘包拆包問題
粘包拆包問題是處於網絡比較底層的問題,在數據鏈路層、網絡層以及傳輸層都有可能發生。咱們平常的網絡應用開發大都在傳輸層進行,因爲UDP
有消息保護邊界,不會發生粘包拆包問題,而所以粘包拆包問題只發生在TCP
協議中。具體講TCP
是個」流"協議,只有流的概念,沒有包的概念,對於業務上層數據的具體含義和邊界並不瞭解,它只會根據TCP
緩衝區的實際狀況進行包的劃分。因此在業務上認爲,一個完整的包可能會被TCP
拆分紅多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP
粘包和拆包問題。
問題舉例說明
下面針對客戶端分別發送了兩個數據表Packet1
和Packet2
給服務端的時候,TCP
粘包和拆包會出現的狀況進行列舉說明:
(1)第一種狀況,服務端分兩次正常收到兩個獨立數據包,即沒有發生拆包和粘包的現象;
(2)第二種狀況,接收端只收到一個數據包,因爲TCP
是不會出現丟包的,因此這一個數據包中包含了客戶端發送的兩個數據包的信息,這種現象即爲粘包。這種狀況因爲接收端不知道這兩個數據包的界限,因此對於服務接收端來講很難處理。
(3)第三種狀況,服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的Packet1
和Packet2
包的部份內容,第二次讀取到了Packet2
的剩餘內容,這被稱爲TCP拆包;
(4)第四種狀況,服務端分兩次讀取到了兩個數據包,第一次讀取到了部分的Packet1
內容,第二次讀取到了Packet1
剩餘內容和Packet2
的整包。
若是此時服務端TCP接收滑窗很是小,而數據包Packet1
和Packet2
比較大,頗有可能服務端須要分屢次才能將兩個包接收徹底,期間發生屢次拆包。以上列舉狀況的背後緣由分別以下:
- 應用程序寫入的數據大於套接字緩衝區大小,這將會發生拆包。
- 應用程序寫入數據小於套接字緩衝區大小,網卡將應用屢次寫入的數據發送到網絡上,這將會發生粘包。
- 進行
MSS
(最大報文長度)大小的TCP
分段,當TCP
報文長度-TCP
頭部長度>MSS
的時候將發生拆包。 - 接收方法不及時讀取套接字緩衝區數據,這將發生粘包。
如何基於Netty處理粘包、拆包問題
因爲底層的TCP
沒法理解上層的業務數據,因此在底層是沒法保證數據包不被拆分和重組的,這個問題只能經過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,能夠概括以下:
- 消息定長,例如每一個報文的大小爲固定長度200字節,若是不夠,空位補空格;
- 在包尾增長回車換行符進行分割,例如
FTP
協議; - 將消息分爲消息頭和消息體,消息頭中包含表示消息總長度的字段,一般設計思路爲消息頭的第一個字段使用
int32
來表示消息的總長度; - 更復雜的應用層協議。
以前Netty示例中其實並無考慮讀半包問題,這在功能測試每每沒有問題,可是一旦請求數過多或者發送大報文以後,就會存在該問題。若是代碼沒有考慮,每每就會出現解碼錯位或者錯誤,致使程序不能正常工做,下面看看Netty是如何根據主流的解決方案進行抽象實現來幫忙解決這一問題的。
以下表所示,Netty爲了找出消息的邊界,採用封幀方式:
方式 | 解碼 | 編碼 |
---|---|---|
固定長度 | FixedLengthFrameDecoder |
簡單 |
分隔符 | DelimiterBasedFrameDecoder |
簡單 |
專門的 length 字段 | LengthFieldBasedFrameDecoder |
LengthFieldPrepender |
注意到,Netty提供了對應的解碼器來解決對應的問題,有了這些解碼器,用戶不須要本身對讀取的報文進行人工解碼,也不須要考慮TCP的粘包和半包問題。爲何這麼說呢?下面列舉一個包尾增長分隔符的例子:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.util.concurrent.atomic.AtomicInteger; /** * @Author: wuxiaofei * @Date: 2020/8/15 0015 19:15 * @Version: 1.0 * @Description:入站處理器 */ @ChannelHandler.Sharable public class DelimiterServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger counter = new AtomicInteger(0); private AtomicInteger completeCounter = new AtomicInteger(0); /*** 服務端讀取到網絡數據後的處理*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept["+request +"] and the counter is:"+counter.incrementAndGet()); String resp = "Hello,"+request+". Welcome to Netty World!" + DelimiterEchoServer.DELIMITER_SYMBOL; ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); } /*** 服務端讀取完成網絡數據後的處理*/ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); System.out.println("the ReadComplete count is " +completeCounter.incrementAndGet()); } /*** 發生異常後的處理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import java.net.InetSocketAddress; /** * @Author: wuxiaofei * @Date: 2020/8/15 0015 19:17 * @Version: 1.0 * @Description:服務端 */ public class DelimiterEchoServer { public static final String DELIMITER_SYMBOL = "@~"; public static final int PORT = 9997; public static void main(String[] args) throws InterruptedException { DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer(); System.out.println("服務器即將啓動"); delimiterEchoServer.start(); } public void start() throws InterruptedException { final DelimiterServerHandler serverHandler = new DelimiterServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*線程組*/ try { ServerBootstrap b = new ServerBootstrap();/*服務端啓動必須*/ b.group(group)/*將線程組傳入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO進行網絡傳輸*/ .localAddress(new InetSocketAddress(PORT))/*指定服務器監聽端口*/ /*服務端每接收到一個鏈接請求,就會新啓一個socket通訊,也就是channel, 因此下面這段代碼的做用就是爲這個子channel增長handle*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*異步綁定到服務器,sync()會阻塞直到完成*/ System.out.println("服務器啓動完成,等待客戶端的鏈接和數據....."); f.channel().closeFuture().sync();/*阻塞直到服務器的channel關閉*/ } finally { group.shutdownGracefully().sync();/*優雅關閉線程組*/ } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL .getBytes()); //服務端收到數據包後通過DelimiterBasedFrameDecoder即分隔符基礎框架解碼器解碼爲一個個帶有分隔符的數據包。 ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new DelimiterServerHandler()); } } }
添加到ChannelPipeline
的DelimiterBasedFrameDecoder
用於對使用分隔符結尾的消息進行自動解碼,固然還有沒有用到的FixedLengthFrameDecoder
用於對固定長度的消息進行自動解碼等解碼器。正如上門的代碼使用案例,有了Netty提供的幾碼器能夠輕鬆地完成對不少消息的自動解碼,並且不須要考慮TCP粘包/拆包致使的讀半包問題,極大地提高了開發效率。
Netty示例代碼詳解
相信看完上面的鋪墊,你對Netty編碼有了必定的瞭解了,下面再來總體梳理一遍吧。
一、設置EventLoopGroup
線程組(Reactor
線程組)
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
上面咱們說過Netty
中使用Reactor
模式,bossGroup
表示服務器鏈接監聽線程組,專門接受 Accept
新的客戶端client
鏈接。另外一個workerGroup
表示處理每一鏈接的數據收發的線程組,來處理消息的讀寫事件。
二、服務端引導器
ServerBootstrap serverBootstrap = new ServerBootstrap();
集成全部配置,用來啓動Netty
服務端。
三、設置ServerBootstrap
信息
serverBootstrap.group(bossGroup, workerGroup);
將兩個線程組設置到ServerBootstrap
中。
四、設置ServerSocketChannel
類型
serverBootstrap.channel(NioServerSocketChannel.class);
設置通道的IO
類型,Netty
不止支持Java NIO
,也支持阻塞式IO
,例如OIO
OioServerSocketChannel.class)
五、設置參數
serverBootstrap.option(ChannelOption.SO_BACKLOG, 100);
經過option()
方法能夠設置不少參數,這裏SO_BACKLOG
標識服務端接受鏈接的隊列長度,若是隊列已滿,客戶端鏈接將被拒絕。默認值,Windows
爲200,其餘爲128,這裏設置的是100。
六、設置Handler
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
設置 ServerSocketChannel
對應的Handler
,這裏只能設置一個,它會在SocketChannel
創建起來以前執行。
七、設置子Handler
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } });
Netty
中提供了一種能夠設置多個Handler
的途徑,即便用ChannelInitializer
方式。ChannelPipeline
是Netty
處理請求的責任鏈,這是一個ChannelHandler
的鏈表,而ChannelHandler
就是用來處理網絡請求的內容的。
每個channel
,都有一個處理器流水線。裝配child channel
流水線,調用childHandler()
方法,傳遞一個ChannelInitializer
的實例。
在 child channel
建立成功,開始通道初始化的時候,在bootstrap啓動器中配置的ChannelInitializer
實例就會被調用。
這個時候,才真正的執行去執行 initChannel
初始化方法,開始通道流水線裝配。
流水線裝配,主要是在流水線pipeline
的後面,增長負責數據讀寫、處理業務邏輯的handler
。
處理器 ChannelHandler
用來處理網絡請求內容,有ChannelInboundHandler
和ChannelOutboundHandler
兩種,ChannlPipeline
會從頭至尾順序調用ChannelInboundHandler
處理網絡請求內容,從尾到頭調用ChannelOutboundHandler
處理網絡請求內容
八、綁定端口號
ChannelFuture f = serverBootstrap.bind(PORT).sync();
綁定端口號
九、等待服務端端口號關閉
f.channel().closeFuture().sync();
等待服務端監聽端口關閉,sync()
會阻塞主線程,內部調用的是 Object
的 wait()
方法
十、關閉EventLoopGroup線程組
bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
總結
這篇文章主要是從一個demo
做爲引子,而後介紹了Netty
的包結構、Reactor
模型、編程規範等等,目的很簡單,但願你可以讀懂這段demo
並寫出來。
後面開始繼續Netty
源碼解析部分,敬請期待。
參考資料
- 《Netty in Action》書籍
- 慕課Netty專欄
- 掘金閃電俠Netty小冊
- 芋道源碼Netty專欄
- Github[fork from krcys]
感謝Netty專欄做者們優秀的文章內容~