高清思惟導圖原件(xmind/pdf/jpg
)能夠關注公衆號:一枝花算不算浪漫
回覆netty01
便可。html
上一篇文章講了NIO
相關的知識點,相比於傳統IO
,NIO
已經作得很優雅了,爲何咱們還要使用Netty
?java
上篇文章最後留了不少坑,講了NIO
使用的弊端,也是爲了引出Netty
而設立的,這篇文章咱們就來好好揭開Netty
的神祕面紗。ios
本篇文章的目的很簡單,但願看事後你能看懂Netty
的示例代碼,針對於簡單的網絡通訊,本身也能用Netty
手寫一個開發應用出來!編程
如下是一個簡單聊天室Server端的程序,代碼參考自:http://www.imooc.com/read/82/article/2166
bootstrap
代碼有點長,主要核心代碼是在main()
方法中,這裏代碼也但願你們看懂,後面也會一步步剖析。設計模式
PS:我是用mac
系統,直接在終端輸入telnet 127.0.0.1 8007
便可啓動一個聊天框,若是提示找不到telnet
命令,能夠經過brew
進行安裝,具體步驟請自行百度。api
/** * @Description netty簡易聊天室 * * @Author 一枝花算不算浪漫 * @Date 2020/8/10 6:52 上午 */ public final class NettyChatServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // 1. EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 2. 服務端引導器 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 3. 設置線bootStrap信息 serverBootstrap.group(bossGroup, workerGroup) // 4. 設置ServerSocketChannel的類型 .channel(NioServerSocketChannel.class) // 5. 設置參數 .option(ChannelOption.SO_BACKLOG, 100) // 6. 設置ServerSocketChannel對應的Handler,只能設置一個 .handler(new LoggingHandler(LogLevel.INFO)) // 7. 設置SocketChannel對應的Handler .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // 能夠添加多個子Handler p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } }); // 8. 綁定端口 ChannelFuture f = serverBootstrap.bind(PORT).sync(); // 9. 等待服務端監聽端口關閉,這裏會阻塞主線程 f.channel().closeFuture().sync(); } finally { // 10. 優雅地關閉兩個線程池 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private static class ChatNettyHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("one conn active: " + ctx.channel()); // channel是在ServerBootstrapAcceptor中放到EventLoopGroup中的 ChatHolder.join((SocketChannel) ctx.channel()); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String content = new String(bytes, StandardCharsets.UTF_8); System.out.println(content); if (content.equals("quit\r\n")) { ctx.channel().close(); } else { ChatHolder.propagate((SocketChannel) ctx.channel(), content); } } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("one conn inactive: " + ctx.channel()); ChatHolder.quit((SocketChannel) ctx.channel()); } } private static class ChatHolder { static final Map<SocketChannel, String> USER_MAP = new ConcurrentHashMap<>(); /** * 加入羣聊 */ static void join(SocketChannel socketChannel) { // 有人加入就給他分配一個id String userId = "用戶"+ ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); send(socketChannel, "您的id爲:" + userId + "\n\r"); for (SocketChannel channel : USER_MAP.keySet()) { send(channel, userId + " 加入了羣聊" + "\n\r"); } // 將當前用戶加入到map中 USER_MAP.put(socketChannel, userId); } /** * 退出羣聊 */ static void quit(SocketChannel socketChannel) { String userId = USER_MAP.get(socketChannel); send(socketChannel, "您退出了羣聊" + "\n\r"); USER_MAP.remove(socketChannel); for (SocketChannel channel : USER_MAP.keySet()) { if (channel != socketChannel) { send(channel, userId + " 退出了羣聊" + "\n\r"); } } } /** * 擴散說話的內容 */ public static void propagate(SocketChannel socketChannel, String content) { String userId = USER_MAP.get(socketChannel); for (SocketChannel channel : USER_MAP.keySet()) { if (channel != socketChannel) { send(channel, userId + ": " + content); } } } /** * 發送消息 */ static void send(SocketChannel socketChannel, String msg) { try { ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; ByteBuf writeBuffer = allocator.buffer(msg.getBytes().length); writeBuffer.writeCharSequence(msg, Charset.defaultCharset()); socketChannel.writeAndFlush(writeBuffer); } catch (Exception e) { e.printStackTrace(); } } } }
代碼有點長,執行完的效果如上圖所示,下面全部內容都是圍繞着如何看懂
以及如何寫出
這樣的代碼來展開的,但願你看完 也能輕鬆手寫Netty
服務端代碼~。經過簡單demo開發讓你們體驗了Netty
實現相比NIO
確實要簡單的多,但優勢不限於此,只須要知道選擇Netty就對了。緩存
對應着文章開頭的思惟導圖,咱們知道Netty
的核心組件主要有:安全
類圖以下:服務器
一看到BootStrap
你們就應該想到啓動類、引導類這樣的詞彙,以前分析過EurekaServer項目啓動類時介紹過EurekaBootstrap
, 他的做用就是上下文初始化、配置初始化。
在Netty
中咱們也有相似的類,Bootstrap
和ServerBootstrap
它們都是Netty
程序的引導類,主要用於配置各類參數,並啓動整個Netty
服務,咱們看下文章開頭的示例代碼:
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } });
Bootstrap
和ServerBootstrap
是針對於Client
和Server
端定義的兩套啓動類,區別以下:
Bootstrap
是客戶端引導類,而ServerBootstrap
是服務端引導類。Bootstrap
一般使用connect()
方法鏈接到遠程的主機和端口,做爲一個TCP客戶端
。ServerBootstrap
一般使用bind()
方法綁定本地的端口,等待客戶端來鏈接。ServerBootstrap
能夠處理Accept
事件,這裏面childHandler
是用來處理Channel
請求的,咱們能夠查看chaildHandler()
方法的註解:Bootstrap
客戶端引導只須要一個EventLoopGroup
,可是一個ServerBootstrap
一般須要兩個(上面的boosGroup
和workerGroup
)。EventLoopGroup
及EventLoop
這兩個類名稱定義的很奇怪,對於初學者來講每每沒法經過名稱來了解其中的含義,包括我也是這樣。
EventLoopGroup
能夠理解爲一個線程池,對於服務端程序,咱們通常會綁定兩個線程池,一個用於處理 Accept
事件,一個用於處理讀寫事件,看下EventLoop
系列的類目錄:
經過上面的類圖,咱們才恍然大悟,個人親孃咧,這不就是一個線程池嘛?(名字氣的犄角拐彎的真是難認)
EventLoopGroup
是EventLoop
的集合,一個EventLoopGroup
包含一個或者多個EventLoop
。咱們能夠將EventLoop
看作EventLoopGroup
線程池中的一個個工做線程。
至於這裏爲何要用到兩個線程池,具體的其實能夠參考Reactor
設計模式,這裏暫時不作過多的講解。
當一個鏈接到達時,Netty
就會建立一個 Channel
,而後從 EventLoopGroup
中分配一個 EventLoop
來給這個 Channel
綁定上,在該 Channel
的整個生命週期中都是有這個綁定的 EventLoop
來服務的。
在Java NIO
中咱們有 ByteBuffer
緩衝池,對於它的操做咱們應該印象深入,往Buffer
中寫數據時咱們須要關注寫入的位置,切換成讀模式時咱們還要切換讀寫狀態,否則將會出現大問題。
針對於NIO
中超級難用的Buffer
類, Netty
提供了ByteBuf
來替代。ByteBuf
聲明瞭兩個指針:一個讀指針,一個寫指針,使得讀寫操做進行分離,簡化buffer
的操做流程。
另外Netty
提供了發幾種ByteBuf
的實現以供咱們選擇,ByteBuf
能夠分爲:
Pooled
和Unpooled
池化和非池化對於這麼多種建立Buffer
的方式該怎麼選擇呢?Netty
也爲咱們處理好了,咱們能夠直接使用(真是暖男Ntetty
):
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; ByteBuf buffer = allocator.buffer(length);
使用這種方式,Netty將最大努力的使用池化、Unsafe、對外內存的方式爲咱們建立buffer。
提起Channel
並不陌生,上一篇講NIO
的三大組件提到過,最多見的就是java.nio.SocketChannel
和java.nio.ServerSocketChannel
,他們用於非阻塞的I/0操做。相似於NIO
的Channel
,Netty提供了本身的Channel
和其子類實現,用於異步I/0操做和其餘相關的操做。
在 Netty
中, Channel
是一個 Socket
鏈接的抽象, 它爲用戶提供了關於底層 Socket
狀態(是不是鏈接仍是斷開) 以及對 Socket
的讀寫等操做。每當 Netty
創建了一個鏈接後, 都會有一個對應的 Channel
實例。而且,有父子channel
的概念。 服務器鏈接監聽的channel
,也叫 parent channel
。 對應於每個 Socket
鏈接的channel
,也叫 child channel
。
既然channel
是 Netty 抽象出來的網絡 I/O 讀寫相關的接口,爲何不使用 JDK NIO
原生的 Channel
而要另起爐竈呢,主要緣由以下:
JDK
的 SocketChannel
和 ServersocketChannel
沒有統一的 Channel
接口供業務開發者使用,對一於用戶而言,沒有統一的操做視圖,使用起來並不方便。JDK
的 SocketChannel
和 ScrversockctChannel
的主要職責就是網絡 I/O 操做,因爲他們是 SPI
類接口,由具體的虛擬機廠家來提供,因此經過繼承 SPI 功能直接實現 ServersocketChannel
和 SocketChannel
來擴展其工做量和從新 Channel
功類是差很少的。ChannelPipeline Channel
須要夠跟 Netty 的總體架構融合在一塊兒,例如 I/O 模型、基的定製模型,以及基於元數據描述配置化的 TCP 參數等,這些 JDK SocketChannel
和ServersocketChannel
都沒有提供,須要從新封裝。Channel
,功實現更加靈活。基於上述 4 緣由,它的設計原理比較簡單, Netty 從新設計了 Channel
接口,而且給予了不少不一樣的實現。可是功能卻比較繁雜,主要的設計理念以下:
Channel
接口層,相關聯的其餘操做封裝起來,採用 Facade
模式進行統一封裝,將網絡 I/O 操做、網絡 I/O 統一對外提供。Channel
接口的定義儘可能大而全,統一的視圖,由不一樣子類實現不一樣的功能,公共功能在抽象父類中實現,最大程度上實現接口的重用。Channel
中,由 Channel
統一負責分配和調度,功能實現更加靈活。Channel
的實現類很是多,繼承關係複雜,從學習的角度咱們抽取最重要的兩個 NioServerSocketChannel
和 NioSocketChannel
。
服務端 NioServerSocketChannel
的繼承關係類圖以下:
客戶端 NioSocketChannel
的繼承關係類圖以下:
後面文章源碼系列會具體分析,這裏就不進一步闡述分析了。
ChannelHandler
是Netty
中最經常使用的組件。ChannelHandler
主要用來處理各類事件,這裏的事件很普遍,好比能夠是鏈接、數據接收、異常、數據轉換等。
ChannelHandler
有兩個核心子類 ChannelInboundHandler
和 ChannelOutboundHandler
,其中 ChannelInboundHandler
用於接收、處理入站( Inbound
)的數據和事件,而 ChannelOutboundHandler
則相反,用於接收、處理出站( Outbound
)的數據和事件。
ChannelInboundHandler
處理入站數據以及各類狀態變化,當Channel
狀態發生改變會調用ChannelInboundHandler
中的一些生命週期方法.這些方法與Channel
的生命密切相關。
入站數據,就是進入socket
的數據。下面展現一些該接口的生命週期API
:
當某個 ChannelInboundHandler
的實現重寫 channelRead()
方法時,它將負責顯式地釋放與池化的 ByteBuf
實例相關的內存。 Netty 爲此提供了一個實用方法ReferenceCountUtil.release()
。
@Sharable public class DiscardHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); } }
這種方式還挺繁瑣的,Netty提供了一個SimpleChannelInboundHandler
,重寫channelRead0()
方法,就能夠在調用過程當中會自動釋放資源.
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { // 不用調用ReferenceCountUtil.release(msg)也會釋放資源 } }
出站操做和數據將由 ChannelOutboundHandler
處理。它的方法將被 Channel
、 ChannelPipeline
以及 ChannelHandlerContext
調用。
ChannelOutboundHandler
的一個強大的功能是能夠按需推遲操做或者事件,這使得能夠經過一些複雜的方法來處理請求。例如, 若是到遠程節點的寫入被暫停了, 那麼你能夠推遲沖刷操做並在稍後繼續。
ChannelPromise
與ChannelFuture
: ChannelOutboundHandler
中的大部分方法都須要一個ChannelPromise
參數, 以便在操做完成時獲得通知。 ChannelPromise
是ChannelFuture
的一個子類,其定義了一些可寫的方法,如setSuccess()
和setFailure()
,從而使ChannelFuture
不可變。
ChannelHandlerAdapter
顧名思義,就是handler
的適配器。你須要知道什麼是適配器模式,假設有一個A接口,咱們須要A的subclass
實現功能,可是B類中正好有咱們須要的功能,不想複製粘貼B中的方法和屬性了,那麼能夠寫一個適配器類Adpter
繼承B實現A,這樣一來Adapter
是A的子類而且能直接使用B中的方法,這種模式就是適配器模式。
就好比Netty中的SslHandler
類,想使用ByteToMessageDecoder
中的方法進行解碼,可是必須是ChannelHandler
子類對象才能加入到ChannelPipeline
中,經過以下簽名和其實現細節(SslHandler
實現細節就不貼了)就可以做爲一個handler
去處理消息了。
public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler
ChannelHandlerAdapter
提供了一些實用方法isSharable()
若是其對應的實現被標註爲 Sharable
, 那麼這個方法將返回 true
, 表示它能夠被添加到多個 ChannelPipeline
中 。若是想在本身的ChannelHandler
中使用這些適配器類,只須要擴展他們,重寫那些想要自定義的方法便可。
每個新建立的 Channel
都將會被分配一個新的 ChannelPipeline
。這項關聯是永久性的; Channel
既不能附加另一個 ChannelPipeline
,也不能分離其當前的。在 Netty 組件的生命週期中,這是一項固定的操做,不須要開發人員的任何干預。
Netty 的 ChannelHandler
爲處理器提供了基本的抽象, 目前你能夠認爲每一個 ChannelHandler
的實例都相似於一種爲了響應特定事件而被執行的回調。從應用程序開發人員的角度來看, 它充當了全部處理入站和出站數據的應用程序邏輯的攔截載體。ChannelPipeline
提供了 ChannelHandler
鏈的容器,並定義了用於在該鏈上傳播入站和出站事件流的 API
。當 Channel
被建立時,它會被自動地分配到它專屬的 ChannelPipeline
。
ChannelHandler
安裝到 ChannelPipeline
中的過程以下所示:
ChannelInitializer
的實現被註冊到了ServerBootstrap
中ChannelInitializer.initChannel()
方法被調用時,ChannelInitializer
將在 ChannelPipeline
中安裝一組自定義的 ChannelHandler
ChannelInitializer
將它本身從 ChannelPipeline
中移除如上圖所示:這是一個同時具備入站和出站 ChannelHandler
的 ChannelPipeline
的佈局,而且印證了咱們以前的關於 ChannelPipeline
主要由一系列的 ChannelHandler
所組成的說法。 ChannelPipeline
還提供了經過 ChannelPipeline
自己傳播事件的方法。若是一個入站事件被觸發,它將被從 ChannelPipeline
的頭部開始一直被傳播到 Channel Pipeline 的尾端。
你可能會說, 從事件途經 ChannelPipeline
的角度來看, ChannelPipeline
的頭部和尾端取決於該事件是入站的仍是出站的。然而 Netty 老是將 ChannelPipeline
的入站口(圖 的左側)做爲頭部,而將出站口(該圖的右側)做爲尾端。
當你完成了經過調用 ChannelPipeline.add*()
方法將入站處理器( ChannelInboundHandler
)和 出 站 處 理 器 ( ChannelOutboundHandler
) 混 合 添 加 到 ChannelPipeline
之 後 , 每 一 個ChannelHandler
從頭部到尾端的順序位置正如同咱們方纔所定義它們的同樣。所以,若是你將圖 6-3 中的處理器( ChannelHandler
)從左到右進行編號,那麼第一個被入站事件看到的 ChannelHandler
將是1,而第一個被出站事件看到的 ChannelHandler
將是 5。
在 ChannelPipeline
傳播事件時,它會測試 ChannelPipeline
中的下一個 ChannelHandler 的類型是否和事件的運動方向相匹配。若是不匹配, ChannelPipeline
將跳過該ChannelHandler
並前進到下一個,直到它找到和該事件所指望的方向相匹配的爲止。 (固然, ChannelHandler
也能夠同時實現ChannelInboundHandler
接口和 ChannelOutboundHandler
接口。)
ChannelPipeline
修改指的是添加或刪除ChannelHandler
,見代碼示例:
ChannelPipeline pipeline = ..; FirstHandler firstHandler = new FirstHandler(); // 先添加一個Handler到ChannelPipeline中 pipeline.addLast("handler1", firstHandler); // 這個Handler放在了first,意味着放在了handler1以前 pipeline.addFirst("handler2", new SecondHandler()); // 這個Handler被放到了last,意味着在handler1以後 pipeline.addLast("handler3", new ThirdHandler()); ... // 經過名稱刪除 pipeline.remove("handler3"); // 經過對象刪除 pipeline.remove(firstHandler); // 名稱"handler2"替換成名稱"handler4",並切handler2的實例替換成了handler4的實例 pipeline.replace("handler2", "handler4", new ForthHandler());
ChannelPipeline
的出入站API
入站API
所示:
[圖片上傳失敗...(image-6037f5-1598167949595)]
出站API
所示:
ChannelPipeline
這個組件上面所講的大體只須要記住這三點便可:
ChannelPipeline
保存了與 Channel
相關聯的 ChannelHandler
ChannelPipeline
能夠根據須要,經過添加或者刪除 ChannelHandler
來動態地修改ChannelPipeline
有着豐富的API
用以被調用,以響應入站和出站事件當 ChannelHandler
被添加到 ChannelPipeline
時,它將會被分配一個 ChannelHandlerContext
,它表明了 ChannelHandler
和 ChannelPipeline
之間的綁定。ChannelHandlerContext
的主要功能是管理它所關聯的ChannelHandler
和在同一個 ChannelPipeline
中的其餘ChannelHandler
之間的交互。
若是調用Channel
或ChannelPipeline
上的方法,會沿着整個ChannelPipeline
傳播,若是調用ChannelHandlerContext
上的相同方法,則會從對應的當前ChannelHandler
進行傳播。
ChannelHandlerContext API
以下表所示:
ChannelHandlerContext
和 ChannelHandler
之間的關聯(綁定)是永遠不會改變的,因此緩存對它的引用是安全的;ChannelHandlerContext
的方法將產生更短的事件流, 應該儘量地利用這個特性來得到最大的性能。ChannelHandler
、ChannelPipeline
的關聯使用從ChannelHandlerContext
訪問channel
ChannelHandlerContext ctx = ..; // 獲取channel引用 Channel channel = ctx.channel(); // 經過channel寫入緩衝區 channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
從ChannelHandlerContext
訪問ChannelPipeline
ChannelHandlerContext ctx = ..; // 獲取ChannelHandlerContext ChannelPipeline pipeline = ctx.pipeline(); // 經過ChannelPipeline寫入緩衝區 pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
有時候咱們不想從頭傳遞數據,想跳過幾個handler
,從某個handler
開始傳遞數據.咱們必須獲取目標handler
以前的handler
關聯的ChannelHandlerContext
。
ChannelHandlerContext ctx = ..; // 直接經過ChannelHandlerContext寫數據,發送到下一個handler ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
好了,ChannelHandlerContext
的基本使用應該掌握了,可是你真的理解ChannelHandlerContext
,ChannelPipeline
和Channelhandler
之間的關係了嗎?不理解也不要緊,由於源碼之後會幫你理解的更爲深入。
Channel
對應一個 ChannelPipeline
ChannelPipeline
包含一條雙向的 ChannelHandlerContext
鏈ChannelHandlerContext
中包含一個 ChannelHandler
Channel
會綁定到一個EventLoop
上NioEventLoop
維護了一個 Selector(
使用的是 Java 原生的 Selector)NioEventLoop
至關於一個線程粘包拆包問題是處於網絡比較底層的問題,在數據鏈路層、網絡層以及傳輸層都有可能發生。咱們平常的網絡應用開發大都在傳輸層進行,因爲UDP
有消息保護邊界,不會發生粘包拆包問題,而所以粘包拆包問題只發生在TCP
協議中。具體講TCP
是個」流"協議,只有流的概念,沒有包的概念,對於業務上層數據的具體含義和邊界並不瞭解,它只會根據TCP
緩衝區的實際狀況進行包的劃分。因此在業務上認爲,一個完整的包可能會被TCP
拆分紅多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP
粘包和拆包問題。
下面針對客戶端分別發送了兩個數據表Packet1
和Packet2
給服務端的時候,TCP
粘包和拆包會出現的狀況進行列舉說明:
(1)第一種狀況,服務端分兩次正常收到兩個獨立數據包,即沒有發生拆包和粘包的現象;
(2)第二種狀況,接收端只收到一個數據包,因爲TCP
是不會出現丟包的,因此這一個數據包中包含了客戶端發送的兩個數據包的信息,這種現象即爲粘包。這種狀況因爲接收端不知道這兩個數據包的界限,因此對於服務接收端來講很難處理。
(3)第三種狀況,服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的Packet1
和Packet2
包的部份內容,第二次讀取到了Packet2
的剩餘內容,這被稱爲TCP拆包;
(4)第四種狀況,服務端分兩次讀取到了兩個數據包,第一次讀取到了部分的Packet1
內容,第二次讀取到了Packet1
剩餘內容和Packet2
的整包。
若是此時服務端TCP接收滑窗很是小,而數據包Packet1
和Packet2
比較大,頗有可能服務端須要分屢次才能將兩個包接收徹底,期間發生屢次拆包。以上列舉狀況的背後緣由分別以下:
MSS
(最大報文長度)大小的TCP
分段,當TCP
報文長度-TCP
頭部長度>MSS
的時候將發生拆包。因爲底層的TCP
沒法理解上層的業務數據,因此在底層是沒法保證數據包不被拆分和重組的,這個問題只能經過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,能夠概括以下:
FTP
協議;int32
來表示消息的總長度;以前Netty示例中其實並無考慮讀半包問題,這在功能測試每每沒有問題,可是一旦請求數過多或者發送大報文以後,就會存在該問題。若是代碼沒有考慮,每每就會出現解碼錯位或者錯誤,致使程序不能正常工做,下面看看Netty是如何根據主流的解決方案進行抽象實現來幫忙解決這一問題的。
以下表所示,Netty爲了找出消息的邊界,採用封幀方式:
方式 | 解碼 | 編碼 |
---|---|---|
固定長度 | FixedLengthFrameDecoder |
簡單 |
分隔符 | DelimiterBasedFrameDecoder |
簡單 |
專門的 length 字段 | LengthFieldBasedFrameDecoder |
LengthFieldPrepender |
注意到,Netty提供了對應的解碼器來解決對應的問題,有了這些解碼器,用戶不須要本身對讀取的報文進行人工解碼,也不須要考慮TCP的粘包和半包問題。爲何這麼說呢?下面列舉一個包尾增長分隔符的例子:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.util.concurrent.atomic.AtomicInteger; /** * @Author: wuxiaofei * @Date: 2020/8/15 0015 19:15 * @Version: 1.0 * @Description:入站處理器 */ @ChannelHandler.Sharable public class DelimiterServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger counter = new AtomicInteger(0); private AtomicInteger completeCounter = new AtomicInteger(0); /*** 服務端讀取到網絡數據後的處理*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept["+request +"] and the counter is:"+counter.incrementAndGet()); String resp = "Hello,"+request+". Welcome to Netty World!" + DelimiterEchoServer.DELIMITER_SYMBOL; ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); } /*** 服務端讀取完成網絡數據後的處理*/ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); System.out.println("the ReadComplete count is " +completeCounter.incrementAndGet()); } /*** 發生異常後的處理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import java.net.InetSocketAddress; /** * @Author: wuxiaofei * @Date: 2020/8/15 0015 19:17 * @Version: 1.0 * @Description:服務端 */ public class DelimiterEchoServer { public static final String DELIMITER_SYMBOL = "@~"; public static final int PORT = 9997; public static void main(String[] args) throws InterruptedException { DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer(); System.out.println("服務器即將啓動"); delimiterEchoServer.start(); } public void start() throws InterruptedException { final DelimiterServerHandler serverHandler = new DelimiterServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*線程組*/ try { ServerBootstrap b = new ServerBootstrap();/*服務端啓動必須*/ b.group(group)/*將線程組傳入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO進行網絡傳輸*/ .localAddress(new InetSocketAddress(PORT))/*指定服務器監聽端口*/ /*服務端每接收到一個鏈接請求,就會新啓一個socket通訊,也就是channel, 因此下面這段代碼的做用就是爲這個子channel增長handle*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*異步綁定到服務器,sync()會阻塞直到完成*/ System.out.println("服務器啓動完成,等待客戶端的鏈接和數據....."); f.channel().closeFuture().sync();/*阻塞直到服務器的channel關閉*/ } finally { group.shutdownGracefully().sync();/*優雅關閉線程組*/ } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL .getBytes()); //服務端收到數據包後通過DelimiterBasedFrameDecoder即分隔符基礎框架解碼器解碼爲一個個帶有分隔符的數據包。 ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new DelimiterServerHandler()); } } }
添加到ChannelPipeline
的DelimiterBasedFrameDecoder
用於對使用分隔符結尾的消息進行自動解碼,固然還有沒有用到的FixedLengthFrameDecoder
用於對固定長度的消息進行自動解碼等解碼器。正如上門的代碼使用案例,有了Netty提供的幾碼器能夠輕鬆地完成對不少消息的自動解碼,並且不須要考慮TCP粘包/拆包致使的讀半包問題,極大地提高了開發效率。
相信看完上面的鋪墊,你對Netty編碼有了必定的瞭解了,下面再來總體梳理一遍吧。
一、設置EventLoopGroup
線程組(Reactor
線程組)
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
上面咱們說過Netty
中使用Reactor
模式,bossGroup
表示服務器鏈接監聽線程組,專門接受 Accept
新的客戶端client
鏈接。另外一個workerGroup
表示處理每一鏈接的數據收發的線程組,來處理消息的讀寫事件。
二、服務端引導器
ServerBootstrap serverBootstrap = new ServerBootstrap();
集成全部配置,用來啓動Netty
服務端。
三、設置ServerBootstrap
信息
serverBootstrap.group(bossGroup, workerGroup);
將兩個線程組設置到ServerBootstrap
中。
四、設置ServerSocketChannel
類型
serverBootstrap.channel(NioServerSocketChannel.class);
設置通道的IO
類型,Netty
不止支持Java NIO
,也支持阻塞式IO
,例如OIO
OioServerSocketChannel.class)
五、設置參數
serverBootstrap.option(ChannelOption.SO_BACKLOG, 100);
經過option()
方法能夠設置不少參數,這裏SO_BACKLOG
標識服務端接受鏈接的隊列長度,若是隊列已滿,客戶端鏈接將被拒絕。默認值,Windows
爲200,其餘爲128,這裏設置的是100。
六、設置Handler
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
設置 ServerSocketChannel
對應的Handler
,這裏只能設置一個,它會在SocketChannel
創建起來以前執行。
七、設置子Handler
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } });
Netty
中提供了一種能夠設置多個Handler
的途徑,即便用ChannelInitializer
方式。ChannelPipeline
是Netty
處理請求的責任鏈,這是一個ChannelHandler
的鏈表,而ChannelHandler
就是用來處理網絡請求的內容的。
每個channel
,都有一個處理器流水線。裝配child channel
流水線,調用childHandler()
方法,傳遞一個ChannelInitializer
的實例。
在 child channel
建立成功,開始通道初始化的時候,在bootstrap啓動器中配置的ChannelInitializer
實例就會被調用。
這個時候,才真正的執行去執行 initChannel
初始化方法,開始通道流水線裝配。
流水線裝配,主要是在流水線pipeline
的後面,增長負責數據讀寫、處理業務邏輯的handler
。
處理器 ChannelHandler
用來處理網絡請求內容,有ChannelInboundHandler
和ChannelOutboundHandler
兩種,ChannlPipeline
會從頭至尾順序調用ChannelInboundHandler
處理網絡請求內容,從尾到頭調用ChannelOutboundHandler
處理網絡請求內容
八、綁定端口號
ChannelFuture f = serverBootstrap.bind(PORT).sync();
綁定端口號
九、等待服務端端口號關閉
f.channel().closeFuture().sync();
等待服務端監聽端口關閉,sync()
會阻塞主線程,內部調用的是 Object
的 wait()
方法
十、關閉EventLoopGroup線程組
bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
這篇文章主要是從一個demo
做爲引子,而後介紹了Netty
的包結構、Reactor
模型、編程規範等等,目的很簡單,但願你可以讀懂這段demo
並寫出來。
後面開始繼續Netty
源碼解析部分,敬請期待。
感謝Netty專欄做者們優秀的文章內容~