本節在《netty 學習筆記二》之上進行了一段大躍進,所以在本節你將會一股腦看到 netty自定義協議設計、數據載體 ByteBuf API、通訊協議編解碼、pipeline 結構、ChannelHandler 生命週期 和 熱插拔效果、單聊和羣聊實現、心跳與空閒檢測、netty IM 系統的性能優化等等內容。因爲 netty 已經造了不少好用的輪子,如粘包拆包處理器、空閒檢測處理器、通用編解碼器等,咱們只須要配置一些構造參數,基本上就能夠足夠使用,不用再重複造輪子了。前端
代碼已經上傳到個人 github:https://github.com/christmad/code-share/tree/master/netty-group-chatjava
一種通用的通訊協議設計:
1. 魔數(magic number)
魔數做爲第一個字段,一般狀況下爲固定的幾個字節,咱們能夠規定爲 4個字節。值通常設定爲不容易被猜到的。
魔數能夠認爲是一種顯示的起始標誌,在 java 的二進制文件中以魔數 0xcafebabe 做爲開頭,有殊途同歸之妙。
在源源不斷的網絡包中,起始標誌能夠減小錯誤率,迅速找出正確的包。
在編程中,magic number 也用來描述不使用變量名而直接使用數字的編程習慣,直接使用數字一般會引發歧義。
2. 版本號
一般是預留字段, IP 協議中也有一個 version 字段用來標識 IP 協議的版本是 IPv4 或 IPv6。
3. 序列化算法
序列化算法,是指如何把對象轉爲二進制數據,以及把二進制數據轉爲對象,此處是 java 對象。
好比 java 自帶的序列化算法,json,hessian 等序列化方式。
規定一個字節,能夠表示 256 種算法,足夠用了。
4. 指令
好比 IM即時通訊系統中客戶端登陸、聊天等指令。
對於 IM系統 ,能夠規定 1個字節,能夠表示 256 種指令,徹底夠用。
5. 數據長度
規定4個字節。
6. 數據內容
變長 N 字節,具體內容序列化後能夠佔不一樣的長度。git
目前除了版本號外,這裏設計的每個字段在 ChannelHandler 裏都提現出來了。魔數對應 IMProtocolSplitter(同時完成了服務端拆包工做);序列化算法對應 UltimatePacketCodecHandler;指令對應 IMHandler;數據對應了具體類型的 Packet。序列化算法採用了 alibaba 的 FASTJSON,JSON 也是前端大量在使用的一種序列化方式。github
有了自定義協議設定,編碼時只要逐字段按照協議拼裝字節便可,一般咱們的 java 對象使用 FASTJSON 序列化後會塞到「數據」字段裏。解碼時比較關鍵的兩個字段是「指令」和「數據」,根據「指令」類型獲取對應的 Class 類型,而後獲取序列化過的 byte[],最後 FASTJSON API 使用這兩個參數進行反序列化。 算法
Bytebuf 數據結構以下圖:數據庫
API 這塊最終每一個人都有本身不一樣的熟悉程度,很差談。只講兩點:編程
1. get/set 方法不會改變 讀寫指針,而 read/write 方法會改變讀寫指針。json
2. 若是遇到內存緊張的問題,必定是沒有釋放內存。netty 某些 decoder 會自動釋放內存,但若是假設我這個項目中用的 MessageToMessageCodec 底層沒有幫咱們管理內存而致使內存泄漏,咱們就應該本身在程序中手動釋放內存。對應的方法是 ByteBuf#release(),它將會把 ByteBuf 引用計數減 1,減到 0 時表示能被回收。默認申請完一塊 ByteBuf 默認計數爲 1。對應的增長計數的方法爲 retain(),在 slice()、duplicate() 場景下會用到。緩存
因爲本次實現的 netty IM系統 server端 和 client端都由 java 實現,而咱們的 client端使用了控制檯來實現。所以代碼中的每一種 ConsoleCommand 就對應了實際項目中的 UI 控件按鈕,如 createGroup、listGroupMembers 等「一級指令」就對應一個個 UI 上的按鈕。構造了一個 ConsoleCommandManager 方便聚合全部的「二級指令」,這個功用和 IMHandler 有點類似,均可以簡化多層 if else 代碼。性能優化
鏈接假死的現象是:在某一端(服務端或者客戶端)看來,底層的 TCP 鏈接已經斷開了,可是應用程序並無捕獲到,所以會認爲這條鏈接仍然是存在的,從 TCP 層面來講,只有收到四次握手數據包或者一個 RST 數據包,鏈接的狀態才表示已斷開。
1. 對於服務端來講,由於每條鏈接都會耗費 cpu 和內存資源,大量假死的鏈接會逐漸耗光服務器的資源,最終致使性能逐漸降低,程序崩潰。
2. 對於客戶端來講,鏈接假死會形成發送數據超時,影響用戶體驗。
a. 應用程序出現線程堵塞,沒法進行數據的讀寫
b. 客戶端或者服務端網絡相關的設備出現故障,好比網卡,機房故障
c. 公網丟包。公網環境相對內網而言,很是容易出現丟包,網絡抖動等現象,若是在一段時間內用戶接入的網絡連續出現丟包現象,那麼對客戶端來講數據一直髮送不出去,而服務端也是一直收不到客戶端來的數據,鏈接就一直耗着
空閒檢測指的是每隔一段時間,檢測這段時間內是否有數據讀寫,簡化一下,咱們的服務端只須要檢測一段時間內,是否收到過客戶端發來的數據便可,Netty 自帶的 IdleStateHandler 就能夠實現這個功能
PS:這個問題上服務端和客戶端的策略是同樣的
1. 鏈接假死。
2. 非假死狀態下確實沒有發送數據
只須要排查第二種狀況。使用 Netty 自帶的 IdleStateHandler 就能夠實現這個功能,見代碼 IMIdleStateHandler.java:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/handler/IMIdleStateHandler.java
優化一般指在服務端優化,服務端單機可能會面對十幾萬甚至幾十萬鏈接,須要進行一些對象碎片管理、優化(縮短)調用鏈(netty 中叫作 縮短事件傳播路徑)、阻塞方法優化等。
在 ServerBootstrap 的 childHandler() 方法中,ChannelInitializer 類的 initChannel 邏輯是:每次有新鏈接到來的時候,都會調用 ChannelInitializer 的 initChannel() 方法,而後把咱們添加的 ChannelHandler 都 new 一次,插入到 channel pipeline 中。
仔細觀察這些 handler ,它們方法中是沒有成員變量的,也就是無狀態的,所以能夠用單例模式來優化這些實例。在單機十幾萬甚至幾十萬鏈接的狀況下,單例使得性能獲得必定程度提高,建立的小對象也大大減小了。
而後重要的一點是,在 netty 中聲明一個 ChannelHandler 是共享的,須要使用註解 @ChannelHandler.Sharable 來告訴 netty 這個 handler 是能夠被多個 channel 共享的。
在沒有單例優化前,你的 ChannelInitializer # initChannel() 方法多是這樣的:
1 serverBootstrap 2 .childHandler(new ChannelInitializer<NioSocketChannel>() { 3 protected void initChannel(NioSocketChannel ch) { 4 ch.pipeline().addLast(new Spliter()); 5 ch.pipeline().addLast(new PacketDecoder()); 6 ch.pipeline().addLast(new LoginRequestHandler()); 7 ch.pipeline().addLast(new AuthHandler()); 8 ch.pipeline().addLast(new MessageRequestHandler()); 9 ch.pipeline().addLast(new CreateGroupRequestHandler()); 10 ch.pipeline().addLast(new JoinGroupRequestHandler()); 11 ch.pipeline().addLast(new QuitGroupRequestHandler()); 12 ch.pipeline().addLast(new ListGroupMembersRequestHandler()); 13 ch.pipeline().addLast(new GroupMessageRequestHandler()); 14 ch.pipeline().addLast(new LogoutRequestHandler()); 15 ch.pipeline().addLast(new PacketEncoder()); 16 } 17 });
使用單例改造後,ChannelInitializer # initChannel() 方法是這樣的:
serverBootstrap .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new Spliter()); ch.pipeline().addLast(new PacketDecoder()); ch.pipeline().addLast(LoginRequestHandler.INSTANCE); ch.pipeline().addLast(AuthHandler.INSTANCE); ch.pipeline().addLast(MessageRequestHandler.INSTANCE); ch.pipeline().addLast(CreateGroupRequestHandler.INSTANCE); ch.pipeline().addLast(JoinGroupRequestHandler.INSTANCE); ch.pipeline().addLast(QuitGroupRequestHandler.INSTANCE); ch.pipeline().addLast(ListGroupMembersRequestHandler.INSTANCE); ch.pipeline().addLast(GroupMessageRequestHandler.INSTANCE); ch.pipeline().addLast(LogoutRequestHandler.INSTANCE); ch.pipeline().addLast(new PacketEncoder()); } });
另外,須要注意的是,Splitter 不能被共享。雖然看起來咱們的 Splitter 方法內也沒有引用任何成員變量,但也許是由於每一個鏈接都要維護本身的 ByteBuf,所以 Splitter 繼承了 拆包器-LengthFieldBasedFrameDecoder 以後因爲父類的有狀態而致使 Splitter 也有狀態了。若是你不信,能夠強行試試把 Splitter 改形成單例。最後你會發現,控制檯會輸出一個錯誤。debug 後你會看到在 Splitter 某個父類中的構造器是這樣的:
protected ByteToMessageDecoder() { ensureNotSharable(); }
這已經在告訴你不能把 ByteToMessageDecoder 和 它的派生子類設爲共享 handler。個人 netty 版本用的是 4.1.24.final,而在這以前的一些版本中 ensureNotSharable() 方法還並非在 ChannelHandler 繼承體系中的一個方法,是用了某種 Util 工具來存放這個方法。不太重點是,咱們知道運行起來效果是同樣的。
Netty 內部提供了一個類,叫作 MessageToMessageCodec,使用它可讓咱們的編解碼操做放到一個類裏面去實現。而且這個 codec 也是能夠共享的。詳情見代碼 UltimatePacketCodecHandler.java:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/handler/UltimatePacketCodecHandler.java
1 @ChannelHandler.Sharable 2 public class UltimatePacketCodecHandler extends MessageToMessageCodec<ByteBuf, Packet> { 3 public static final UltimatePacketCodecHandler INSTANCE = new UltimatePacketCodecHandler(); 4 5 private UltimatePacketCodecHandler() {} 6 7 @Override 8 protected void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) { 9 // 使用 channel 上的 ByteBuf alloc,方便 netty 管理內存 10 ByteBuf byteBuf = ctx.channel().alloc().ioBuffer(); 11 out.add(PacketCodec.INSTANCE.encode(byteBuf, packet)); 12 } 13 14 @Override 15 protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) { 16 out.add(PacketCodec.INSTANCE.decode(buf)); 17 } 18 }
對咱們的 IM 應用來講,每次從控制檯(對應一個UI按鈕)只會傳一個指令到服務器,而且這個指令只會被某一個 handler 處理,所以這些指令 handler 有一個「平行」的概念。咱們能夠將這些平行的 handler 壓縮爲一個 handler,如 IMRequestHandler 所示:
1 @ChannelHandler.Sharable 2 public class IMRequestHandler extends SimpleChannelInboundHandler<Packet> { 3 4 public static final IMRequestHandler INSTANCE = new IMRequestHandler(); 5 6 private Map<Byte, SimpleChannelInboundHandler<? extends Packet>> channelMap; 7 8 private IMRequestHandler() { 9 channelMap = new HashMap<>(); 10 // 將指令類型 和 request handler 作映射 11 channelMap.put(Command.MESSAGE_REQUEST, MessageRequestHandler.INSTANCE); 12 channelMap.put(Command.LOGIN_REQUEST, LoginRequestHandler.INSTANCE); 13 channelMap.put(Command.LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE); 14 channelMap.put(Command.CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE); 15 channelMap.put(Command.JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE); 16 channelMap.put(Command.LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE); 17 channelMap.put(Command.QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE); 18 channelMap.put(Command.GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE); 19 } 20 21 @Override 22 protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception { 23 SimpleChannelInboundHandler<? extends Packet> simpleChannelInboundHandler = channelMap.get(msg.getCommand()); 24 if (simpleChannelInboundHandler != null) { 25 // 只關心能處理的類型 26 simpleChannelInboundHandler.channelRead(ctx, msg); 27 } 28 } 29 }
再看看代碼中的 IMResponseHandler:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/client/handler/IMResponseHandler.java
其實客戶端沒有必要進行這種程度的優化,不過能夠再次感覺一下 netty 給咱們帶來的編碼上的方便。
一般咱們的應用會涉及數據庫或網絡操做,好比在 LoginRequestHandler 中,實際上在 valid() 或 checkUser() 方法中作的事情是把用戶名和密碼拿到數據庫或某個網絡中間件裏面去進行比較,而例子中我只是粗暴驗證直接返回 true 並簡單的生成一個 userId 返回了。實際場景以下:
1 protected void channelRead0(ChannelHandlerContext ctx, T packet) { 2 // 1. balabala 一些邏輯 3 // 2. 數據庫或者網絡等一些耗時的操做 4 // 3. writeAndFlush() 5 // 4. balabala 其餘的邏輯 6 }
對於第2個過程當中的耗時操做,一般不會直接這樣寫。爲何?先來看看 netty 一條 NIO 線程的處理邏輯抽象:
1 List<Channel> channelList = 已有數據可讀的 channel 2 for (Channel channel in channelist) { 3 for (ChannelHandler handler in channel.pipeline()) { 4 handler.channelRead0(ctx, msg); 5 } 6 }
當咱們執行 NioEventLoopGroup worker = new NioEventLoopGroup(); 這行代碼時,netty 默認會啓動 2倍 CPU 核數的 NIO 線程,在單機大量鏈接(幾萬甚至十幾萬以上)狀況下, 一條 NIO 線程管理着幾千條甚至上萬條鏈接。若是在某個鏈接上執行 channelRead0() 時發生阻塞,最終都會拖慢綁定在該 NIO 線程上的其餘 channel 的執行速度。
這時咱們應該把耗時操做扔到業務線程池中去處理,處理邏輯如 LoginRequestHandler.java 中代碼所示:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/server/handler/LoginRequestHandler.java,僞代碼以下:
1 ThreadPool threadPool = xxx; 2 3 protected void channelRead0(ChannelHandlerContext ctx, T packet) { 4 threadPool.submit(new Runnable() { 5 // 1. balabala 一些邏輯 6 // 2. 數據庫或者網絡等一些耗時的操做 7 // 3. writeAndFlush() 8 // 4. balabala 其餘的邏輯 9 }); 10 }
最後,其餘小細節就不在本篇里長篇大論了,之後應該會收集一個系列來專門記錄編程裏的小技巧。不少功能也沒有在這個版本里一併實現,好比消息的存儲,須要加上數據庫。以及「模擬打開聊天窗口」時查看最近的一些消息等。參考 QQ 最近這幾年的變化,打開聊天窗口加載的消息數量變少了,若是有關注的話應該會對這個變化有印象,以前的一些版本中打開窗口就能看到以前聊過的十幾行消息,後面慢慢變成幾行,目前(2019-11-07)打開窗口只能看三行了。只要有時間,這些功能都是能夠添加的。好比消息存儲和歷史消息這塊,先有消息存儲後,後續就能夠作一個 7天內、3天內的、當天的N條 等不一樣級別的歷史消息緩存級別。後面有空我也會繼續持續完善這個 IM 系統的功能,畢竟這是我興趣的項目之一。
有緣下篇博客見,See ya~~