跟着源碼學IM(八):萬字長文,手把手教你用Netty打造IM聊天

本文做者芋艿,原題「使用 Netty 實現 IM 聊天賊簡單」,本底價有修訂和改動。html

1、本文引言

上篇《跟着源碼學IM(七):手把手教你用WebSocket打造Web端IM聊天》中,咱們使用 WebSocket 實現了一個簡單的 IM 功能,支持身份認證、私聊消息、羣聊消息。前端

而後就有人發私信,但願使用純 Netty 實現一個相似的功能,所以就有了本文。java

注:源碼請從同步連接附件中下載,http://www.52im.net/thread-34...git

學習交流:github

  • 即時通信/推送技術開發交流5羣:215477170 [推薦]
  • 移動端IM開發入門文章:《新手入門一篇就夠:從零開發移動端IM》
  • 開源IM框架源碼:https://github.com/JackJiang2...

(本文同步發佈於:http://www.52im.net/thread-34...算法

2、知識準備

可能有人不知道 Netty 是什麼,這裏簡單介紹下:spring

Netty 是一個 Java 開源框架。Netty 提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。數據庫

也就是說,Netty 是一個基於 NIO 的客戶、服務器端編程框架,使用Netty 能夠確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。編程

Netty 至關簡化和流線化了網絡應用的編程開發過程,例如,TCP 和 UDP 的 Socket 服務開發。後端

如下是幾篇有關Netty的入門文章,值得一讀:

《新手入門:目前爲止最透徹的的Netty高性能原理和框架架構解析》
《寫給初學者:Java高性能NIO框架Netty的學習方法和進階策略》
《史上最通俗Netty框架入門長文:基本介紹、環境搭建、動手實戰》

若是你連Java的NIO都不知道是什麼,下面的文章建議優先讀一下:

《少囉嗦!一分鐘帶你讀懂Java的NIO和經典IO的區別》
《史上最強Java NIO入門:擔憂從入門到放棄的,請讀這篇!》
《Java的BIO和NIO很難懂?用代碼實踐給你看,再不懂我轉行!》

Netty源碼和API的在線閱讀地址:

1)Netty-4.1.x 完整源碼(在線閱讀版)(* 推薦)
2)Netty-4.0.x 完整源碼(在線閱讀版)
3)Netty-4.1.x API文檔(在線版)(* 推薦)
4)Netty-4.0.x API文檔(在線版)

3、本文源碼

本文完整代碼附件下載:請從同步連接附件中下載,http://www.52im.net/thread-34...

源碼的目錄結構,以下圖所示:

如上圖所示:

1)lab-67-netty-demo-server 項目:搭建 Netty 服務端;
2)lab-67-netty-demo-client 項目:搭建 Netty 客戶端;
3)lab-67-netty-demo-common 項目:提供 Netty 的基礎封裝,提供消息的編解碼、分發的功能。
另外,源碼中也會提供 Netty 經常使用功能的示例:

1)心跳機制,實現服務端對客戶端的存活檢測;
2)斷線重連,實現客戶端對服務端的從新鏈接。
不嗶嗶,直接開幹。

5、通訊協議

在上一章中,咱們實現了客戶端和服務端的鏈接功能。而本小節,咱們要讓它們兩可以說上話,即進行數據的讀寫。

在平常項目的開發中,前端和後端之間採用 HTTP 做爲通訊協議,使用文本內容進行交互,數據格式通常是 JSON。可是在 TCP 的世界裏,咱們須要本身基於二進制構建,構建客戶端和服務端的通訊協議。

咱們以客戶端向服務端發送消息來舉個例子,假設客戶端要發送一個登陸請求。

對應的類以下:

public class AuthRequest {

/** 用戶名 **/

private String username;

/** 密碼 **/

private String password;

}

顯然:咱們沒法將一個 Java 對象直接丟到 TCP Socket 當中,而是須要將其轉換成 byte 字節數組,才能寫入到 TCP Socket 中去。即,須要將消息對象經過序列化,轉換成 byte 字節數組。

同時:在服務端收到 byte 字節數組時,須要將其又轉換成 Java 對象,即反序列化。否則,服務端對着一串 byte 字節處理個毛線?!

友情提示:服務端向客戶端發消息,也是同樣的過程哈!

序列化的工具很是多,例如說 Google 提供的 Protobuf,性能高效,且序列化出來的二進制數據較小。Netty 對 Protobuf 進行集成,提供了相應的編解碼器。

以下圖所示:

可是考慮到不少可能對 Protobuf 並不瞭解,由於它實現序列化又增長額外學習成本。所以,仔細一個捉摸,仍是採用 JSON 方式進行序列化。可能有人會疑惑,JSON 不是將對象轉換成字符串嗎?嘿嘿,咱們再把字符串轉換成 byte 字節數組就能夠啦~

下面,咱們新建 lab-67-netty-demo-common 項目,並在 codec 包下,實現咱們自定義的通訊協議。

以下圖所示:

5.一、Invocation
建立 Invocation 類,通訊協議的消息體。

代碼以下:

/**

  • 通訊協議的消息體

*/

public class Invocation {

/**

 * 類型

 */

private String type;

/**

 * 消息,JSON 格式

 */

private String message;



// 空構造方法

public Invocation() {

}



public Invocation(String type, String message) {

    this.type = type;

    this.message = message;

}



public Invocation(String type, Message message) {

    this.type = type;

    this.message = JSON.toJSONString(message);

}



// ... 省略 setter、getter、toString 方法

}

① type 屬性,類型,用於匹配對應的消息處理器。若是類比 HTTP 協議,type 屬性至關於請求地址。

② message 屬性,消息內容,使用 JSON 格式。

另外,Message 是咱們定義的消息接口,代碼以下:

public interface Message {

// ... 空,做爲標記接口

}

5.二、粘包與拆包
在開始看 Invocation 的編解碼處理器以前,咱們先了解下粘包與拆包的概念。

5.2.1 產生緣由
產生粘包和拆包問題的主要緣由是,操做系統在發送 TCP 數據的時候,底層會有一個緩衝區,例如 1024 個字節大小。

若是一次請求發送的數據量比較小,沒達到緩衝區大小,TCP 則會將多個請求合併爲同一個請求進行發送,這就造成了粘包問題。

例如說:在《詳解 Socket 編程 --- TCP_NODELAY 選項》文章中咱們能夠看到,在關閉 Nagle 算法時,請求不會等待知足緩衝區大小,而是儘快發出,下降延遲。

若是一次請求發送的數據量比較大,超過了緩衝區大小,TCP 就會將其拆分爲屢次發送,這就是拆包,也就是將一個大的包拆分爲多個小包進行發送。

以下圖展現了粘包和拆包的一個示意圖,演示了粘包和拆包的三種狀況:

如上圖所示:

1)A 和 B 兩個包都恰好知足 TCP 緩衝區的大小,或者說其等待時間已經達到 TCP 等待時長,從而仍是使用兩個獨立的包進行發送;
2)A 和 B 兩次請求間隔時間內較短,而且數據包較小,於是合併爲同一個包發送給服務端;
3)B 包比較大,於是將其拆分爲兩個包 B_1 和 B_2 進行發送,而這裏因爲拆分後的 B_2 比較小,其又與 A 包合併在一塊兒發送。
5.2.2 解決方案
對於粘包和拆包問題,常見的解決方案有三種。

① 客戶端在發送數據包的時候,每一個包都固定長度。好比 1024 個字節大小,若是客戶端發送的數據長度不足 1024 個字節,則經過補充空格的方式補全到指定長度。

這種方式,暫時沒有找到採用這種方式的案例。

② 客戶端在每一個包的末尾使用固定的分隔符。例如 \r\n,若是一個包被拆分了,則等待下一個包發送過來以後找到其中的 \r\n,而後對其拆分後的頭部部分與前一個包的剩餘部分進行合併,這樣就獲得了一個完整的包。具體的案例,有 HTTP、WebSocket、Redis。

③ 將消息分爲頭部和消息體,在頭部中保存有當前整個消息的長度,只有在讀取到足夠長度的消息以後纔算是讀到了一個完整的消息。

友情提示:方案 ③ 是 ① 的升級版,動態長度。

本文將採用這種方式,在每次 Invocation 序列化成字節數組寫入 TCP Socket 以前,先將字節數組的長度寫到其中。

以下圖所示:

5.三、InvocationEncoder
建立 InvocationEncoder 類,實現將 Invocation 序列化,並寫入到 TCP Socket 中。

代碼以下:

public class InvocationEncoder extends MessageToByteEncoder<Invocation> {

private Logger logger = LoggerFactory.getLogger(getClass());



@Override

protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {

    // <2.1> 將 Invocation 轉換成 byte[] 數組

    byte[] content = JSON.toJSONBytes(invocation);

    // <2.2> 寫入 length

    out.writeInt(content.length);

    // <2.3> 寫入內容

    out.writeBytes(content);

    logger.info("[encode][鏈接({}) 編碼了一條消息({})]", ctx.channel().id(), invocation.toString());

}

}

① MessageToByteEncoder 是 Netty 定義的編碼 ChannelHandler 抽象類,將泛型 消息轉換成字節數組。

② #encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) 方法,進行編碼的邏輯。

<2.1> 處,調用 JSON 的 #toJSONBytes(Object object, SerializerFeature... features) 方法,將 Invocation 轉換成 字節數組。

<2.2> 處,將字節數組的長度,寫入到 TCP Socket 當中。這樣,後續「5.4 InvocationDecoder」能夠根據該長度,解析到消息,解決粘包和拆包的問題。

友情提示:MessageToByteEncoder 會最終將 ByteBuf out 寫到 TCP Socket 中。

<2.3> 處,將字節數組,寫入到 TCP Socket 當中。

5.四、InvocationDecoder
建立 InvocationDecoder 類,實現從 TCP Socket 讀取字節數組,反序列化成 Invocation。

代碼以下:

① ByteToMessageDecoder 是 Netty 定義的解碼 ChannelHandler 抽象類,在 TCP Socket 讀取到新數據時,觸發進行解碼。

② 在 <2.1>、<2.2>、<2.3> 處,從 TCP Socket 中讀取長度。

③ 在 <3.1>、<3.2>、<3.3> 處,從 TCP Socket 中讀取字節數組,並反序列化成 Invocation 對象。

最終,添加 List<Object> out 中,交給後續的 ChannelHandler 進行處理。稍後,咱們將在「6. 消息分發」小結中,會看到 MessageDispatcher 將 Invocation 分發到其對應的 MessageHandler 中,進行業務邏輯的執行。

5.五、引入依賴
建立 pom.xml 文件,引入 Netty、FastJSON 等等依賴。

5.六、本章小結
至此,咱們已經完成通訊協議的定義、編解碼的邏輯,是否是蠻有趣的?!

另外,咱們在 NettyServerHandlerInitializer 和 NettyClientHandlerInitializer 的初始化代碼中,將編解碼器添加到其中。

以下圖所示:

6、消息分發

在 SpringMVC 中,DispatcherServlet 會根據請求地址、方法等,將請求分發到匹配的 Controller 的 Method 方法上。

在 lab-67-netty-demo-client 項目的 dispatcher 包中,咱們建立了 MessageDispatcher 類,實現和 DispatcherServlet 相似的功能,將 Invocation 分發到其對應的 MessageHandler 中,進行業務邏輯的執行。

下面,咱們來看看具體的代碼實現。

6.一、Message
建立 Message 接口,定義消息的標記接口。

代碼以下:

public interface Message {

}

下圖,是咱們涉及到的 Message 實現類。

以下圖所示:

6.二、MessageHandler
建立 MessageHandler 接口,消息處理器接口。

代碼以下:

public interface MessageHandler<T extendsMessage> {

/**

 * 執行處理消息

 *

 * @param channel 通道

 * @param message 消息

 */

voide xecute(Channel channel, T message);



/**

 * @return 消息類型,即每一個 Message 實現類上的 TYPE 靜態字段

 */

String getType();

}

如上述代碼所示:

1)定義了泛型 <T> ,須要是 Message 的實現類;
2)定義的兩個接口方法,本身看下注釋哈。
下圖,是咱們涉及到的 MessageHandler 實現類。

以下圖所示:

6.三、MessageHandlerContainer
建立 MessageHandlerContainer 類,做爲 MessageHandler 的容器。

代碼以下:

① 實現 InitializingBean 接口,在 #afterPropertiesSet() 方法中,掃描全部 MessageHandler Bean ,添加到 MessageHandler 集合中。

② 在 #getMessageHandler(String type) 方法中,得到類型對應的 MessageHandler 對象。稍後,咱們會在 MessageDispatcher 調用該方法。

③ 在 #getMessageClass(MessageHandler handler) 方法中,經過 MessageHandler 中,經過解析其類上的泛型,得到消息類型對應的 Class 類。這是參考 rocketmq-spring 項目的 DefaultRocketMQListenerContainer#getMessageType() 方法,進行略微修改。

6.四、MessageDispatcher
建立 MessageDispatcher 類,將 Invocation 分發到其對應的 MessageHandler 中,進行業務邏輯的執行。

代碼以下:

@ChannelHandler.Sharable

public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> {

@Autowired

private MessageHandlerContainer messageHandlerContainer;



private final ExecutorService executor =  Executors.newFixedThreadPool(200);



@Override

protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) {

    // <3.1> 得到 type 對應的 MessageHandler 處理器

    MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());

    // 得到  MessageHandler 處理器的消息類

    Class<? extendsMessage> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);

    // <3.2> 解析消息

    Message message = JSON.parseObject(invocation.getMessage(), messageClass);

    // <3.3> 執行邏輯

    executor.submit(newRunnable() {



        @Override

        public void run() {

            // noinspection unchecked

            messageHandler.execute(ctx.channel(), message);

        }

    });

}

}

① 在類上添加 @ChannelHandler.Sharable 註解,標記這個 ChannelHandler 能夠被多個 Channel 使用。

② SimpleChannelInboundHandler 是 Netty 定義的消息處理 ChannelHandler 抽象類,處理消息的類型是 泛型時。

③ #channelRead0(ChannelHandlerContext ctx, Invocation invocation) 方法,處理消息,進行分發。

<3.1> 處,調用 MessageHandlerContainer 的 #getMessageHandler(String type) 方法,得到 Invocation 的 type 對應的 MessageHandler 處理器。

而後,調用 MessageHandlerContainer 的 #getMessageClass(messageHandler) 方法,得到 MessageHandler 處理器的消息類。

<3.2> 處,調用 JSON 的 ## parseObject(String text, Class<T> clazz) 方法,將 Invocation 的 message 解析成 MessageHandler 對應的消息對象。

<3.3> 處,丟到線程池中,而後調用 MessageHandler 的 #execute(Channel channel, T message) 方法,執行業務邏輯。

注意:爲何要丟到 executor 線程池中呢?咱們先來了解下 EventGroup 的線程模型。

友情提示:在咱們啓動 Netty 服務端或者客戶端時,都會設置其 EventGroup。

EventGroup 咱們能夠先簡單理解成一個線程池,而且線程池的大小僅僅是 CPU 數量 * 2。每一個 Channel 僅僅會被分配到其中的一個線程上,進行數據的讀寫。而且,多個 Channel 會共享一個線程,即便用同一個線程進行數據的讀寫。

那麼試着思考下,MessageHandler 的具體邏輯視線中,每每會涉及到 IO 處理,例如說進行數據庫的讀取。這樣,就會致使一個 Channel 在執行 MessageHandler 的過程當中,阻塞了共享當前線程的其它 Channel 的數據讀取。

所以,咱們在這裏建立了 executor 線程池,進行 MessageHandler 的邏輯執行,避免阻塞 Channel 的數據讀取。

可能會有人說,咱們是否是可以把 EventGroup 的線程池設置大一點,例如說 200 呢?對於長鏈接的 Netty 服務端,每每會有 1000 ~ 100000 的 Netty 客戶端鏈接上來,這樣不管設置多大的線程池,都會出現阻塞數據讀取的狀況。

友情提示:executor 線程池,咱們通常稱之爲業務線程池或者邏輯線程池,顧名思義,就是執行業務邏輯的。這樣的設計方式,目前 Dubbo 等等 RPC 框架,都採用這種方式。後續,能夠認真閱讀下《【NIO 系列】——之 Reactor 模型》文章,進一步理解。

6.五、NettyServerConfig
建立 NettyServerConfig 配置類,建立 MessageDispatcher 和 MessageHandlerContainer Bean。

代碼以下:

@Configuration

public class NettyServerConfig {

@Bean

public MessageDispatcher messageDispatcher() {

    return new MessageDispatcher();

}



@Bean

public MessageHandlerContainer messageHandlerContainer() {

    return new MessageHandlerContainer();

}

}

6.六、NettyClientConfig
建立 NettyClientConfig 配置類,建立 MessageDispatcher 和 MessageHandlerContainer Bean。

代碼以下:

@Configuration

public class NettyClientConfig {

@Bean

public MessageDispatcher messageDispatcher() {

    return new MessageDispatcher();

}

@Bean

public MessageHandlerContainer messageHandlerContainer() {

    return new MessageHandlerContainer();

}

}

6.七、本章小結
後續,咱們將在以下小節,具體演示消息分發的使用。

7、斷開重連

Netty 客戶端須要實現斷開重連機制,解決各類狀況下的斷開狀況。

例如說:

1)Netty 客戶端啓動時,Netty 服務端處於掛掉,致使沒法鏈接上;
2)在運行過程當中,Netty 服務端掛掉,致使鏈接被斷開;
3)任一一端網絡抖動,致使鏈接異常斷開。
具體的代碼實現比較簡單,只須要在兩個地方增長重連機制:

1)Netty 客戶端啓動時,沒法鏈接 Netty 服務端時,發起重連;
2)Netty 客戶端運行時,和 Netty 斷開鏈接時,發起重連。
考慮到重連會存在失敗的狀況,咱們採用定時重連的方式,避免佔用過多資源。

7.一、具體代碼
① 在 NettyClient 中,提供 #reconnect() 方法,實現定時重連的邏輯。

代碼以下:

// NettyClient.java

public void reconnect() {

eventGroup.schedule(new Runnable() {

    @Override

    publicvoidrun() {

        logger.info("[reconnect][開始重連]");

        try{

            start();

        } catch(InterruptedException e) {

            logger.error("[reconnect][重連失敗]", e);

        }

    }

}, RECONNECT_SECONDS, TimeUnit.SECONDS);

logger.info("[reconnect][{} 秒後將發起重連]", RECONNECT_SECONDS);

}

經過調用 EventLoop 提供的 #schedule(Runnable command, long delay, TimeUnit unit) 方法,實現定時邏輯。而在內部的具體邏輯,調用 NettyClient 的 #start() 方法,發起鏈接 Netty 服務端。

又由於 NettyClient 在 #start() 方法在鏈接 Netty 服務端失敗時,又會調用 #reconnect() 方法,從而再次發起定時重連。如此循環反覆,知道 Netty 客戶端鏈接上 Netty 服務端。

以下圖所示:

② 在 NettyClientHandler 中,實現 #channelInactive(ChannelHandlerContext ctx) 方法,在發現和 Netty 服務端斷開時,調用 Netty Client 的 #reconnect() 方法,發起重連。

代碼以下:

// NettyClientHandler.java

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

// 發起重連

nettyClient.reconnect();

// 繼續觸發事件

super.channelInactive(ctx);

}

7.二、簡單測試
① 啓動 Netty Client,不要啓動 Netty Server,控制檯打印日誌以下圖:

能夠看到 Netty Client 在鏈接失敗時,不斷髮起定時重連。

② 啓動 Netty Server,控制檯打印以下圖:

能夠看到 Netty Client 成功重連上 Netty Server。

8、心跳機制與空閒檢測

咱們能夠了解到 TCP 自帶的空閒檢測機制,默認是 2 小時。這樣的檢測機制,從系統資源層面上來講是能夠接受的。

可是在業務層面,若是 2 小時才發現客戶端與服務端的鏈接實際已經斷開,會致使中間很是多的消息丟失,影響客戶的使用體驗。

所以,咱們須要在業務層面,本身實現空閒檢測,保證儘快發現客戶端與服務端實際已經斷開的狀況。

實現邏輯以下:

1)服務端發現 180 秒未從客戶端讀取到消息,主動斷開鏈接;
2)客戶端發現 180 秒未從服務端讀取到消息,主動斷開鏈接。
考慮到客戶端和服務端之間並非一直有消息的交互,因此咱們須要增長心跳機制。

邏輯以下:

1)客戶端每 60 秒向服務端發起一次心跳消息,保證服務端能夠讀取到消息;
2)服務端在收到心跳消息時,回覆客戶端一條確認消息,保證客戶端能夠讀取到消息。
友情提示:

爲何是 180 秒?能夠加大或者減少,看本身但願多快檢測到鏈接異常。太短的時間,會致使心跳過於頻繁,佔用過多資源。

爲何是 60 秒?三次機會,確認是否心跳超時。

雖然聽起來有點複雜,可是實現起來並不複雜哈。

8.一、服務端的空閒檢測
在 NettyServerHandlerInitializer 中,咱們添加了一個 ReadTimeoutHandler 處理器,它在超過指定時間未從對端讀取到數據,會拋出 ReadTimeoutException 異常。

以下圖所示:

經過這樣的方式,實現服務端發現 180 秒未從客戶端讀取到消息,主動斷開鏈接。

8.二、客戶端的空閒檢測
在 NettyClientHandlerInitializer 中,咱們添加了一個 ReadTimeoutHandler 處理器,它在超過指定時間未從對端讀取到數據,會拋出 ReadTimeoutException 異常。

以下圖所示:

經過這樣的方式,實現客戶端發現 180 秒未從服務端讀取到消息,主動斷開鏈接。

8.三、心跳機制
Netty 提供了 IdleStateHandler 處理器,提供空閒檢測的功能,在 Channel 的讀或者寫空閒時間太長時,將會觸發一個 IdleStateEvent 事件。

這樣,咱們只須要在 NettyClientHandler 處理器中,在接收到 IdleStateEvent 事件時,客戶端向客戶端發送一次心跳消息。

以下圖所示:

其中,HeartbeatRequest 是心跳請求。

同時,咱們在服務端項目中,建立了一個 HeartbeatRequestHandler 消息處理器,在收到客戶端的心跳請求時,回覆客戶端一條確認消息。

代碼以下:

@Component

public class HeartbeatRequestHandler implementsMessageHandler<HeartbeatRequest> {

private Logger logger = LoggerFactory.getLogger(getClass());



@Override

public void execute(Channel channel, HeartbeatRequest message) {

    logger.info("[execute][收到鏈接({}) 的心跳請求]", channel.id());

    // 響應心跳

    HeartbeatResponse response = newHeartbeatResponse();

    channel.writeAndFlush(newInvocation(HeartbeatResponse.TYPE, response));

}



@Override

public String getType() {

    return HeartbeatRequest.TYPE;

}

}

其中,HeartbeatResponse 是心跳確認響應。

8.四、簡單測試
啓動 Netty Server 服務端,再啓動 Netty Client 客戶端,耐心等待 60 秒後,能夠看到心跳日誌以下:

9、認證邏輯

從本小節開始,咱們就具體看看業務邏輯的處理示例。

認證的過程,以下圖所示:

9.一、AuthRequest
建立 AuthRequest 類,定義用戶認證請求。

代碼以下:

public class AuthRequest implements Message {

public static final String TYPE = "AUTH_REQUEST";

/**

* 認證 Token

 */

private String accessToken;

// ... 省略 setter、getter、toString 方法

}

這裏咱們使用 accessToken 認證令牌進行認證。

由於通常狀況下,咱們使用 HTTP 進行登陸系統,而後使用登陸後的身份標識(例如說 accessToken 認證令牌),將客戶端和當前用戶進行認證綁定。

9.二、AuthResponse
建立 AuthResponse 類,定義用戶認證響應。

代碼以下:

public class AuthResponse implements Message {

public static final String TYPE = "AUTH_RESPONSE";



/**

 * 響應狀態碼

 */

private Integer code;

/**

 * 響應提示

 */

private String message;



// ... 省略 setter、getter、toString 方法

}

9.三、AuthRequestHandler
服務端...

建立 AuthRequestHandler 類,爲服務端處理客戶端的認證請求。

代碼以下:

代碼比較簡單,看看 <1>、<2>、<3>、<4> 上的註釋。

9.四、AuthResponseHandler
客戶端...

建立 AuthResponseHandler 類,爲客戶端處理服務端的認證響應。

代碼以下:

@Component

public class AuthResponseHandler implements MessageHandler<AuthResponse> {

private Logger logger = LoggerFactory.getLogger(getClass());



@Override

public void execute(Channel channel, AuthResponse message) {

    logger.info("[execute][認證結果:{}]", message);

}



@Override

public String getType() {

    return AuthResponse.TYPE;

}

}

打印個認證結果,方便調試。

9.五、TestController
客戶端...

建立 TestController 類,提供 /test/mock 接口,模擬客戶端向服務端發送請求。

代碼以下:

@RestController

@RequestMapping("/test")

public class TestController {

@Autowired

private NettyClient nettyClient;



@PostMapping("/mock")

public String mock(String type, String message) {

    // 建立 Invocation 對象

    Invocation invocation = new Invocation(type, message);

    // 發送消息

    nettyClient.send(invocation);

    return "success";

}

}

9.六、簡單測試
啓動 Netty Server 服務端,再啓動 Netty Client 客戶端,而後使用 Postman 模擬一次認證請求。

以下圖所示:

同時,能夠看到認證成功的日誌以下:

11、羣聊邏輯

羣聊的過程,以下圖所示:

服務端負責將客戶端 A 發送的羣聊消息,轉發給客戶端 A、B、C。

友情提示:考慮到邏輯簡潔,提供的本小節的示例並非一個一個羣,而是全部人在一個大的羣聊中哈~

11.一、ChatSendToAllRequest
建立 ChatSendToOneRequest 類,發送給全部人的羣聊消息的請求。

代碼以下:

public class ChatSendToAllRequest implements Message {

public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST";

/**

 * 消息編號

 */

private String msgId;

/**

 * 內容

 */

private String content;



// ... 省略 setter、getter、toString 方法

}

PS:若是是正經的羣聊,會有一個 groupId 字段,表示羣編號。

11.二、ChatSendToAllHandler
服務端...

建立 ChatSendToAllHandler 類,爲服務端處理客戶端的羣聊請求。

代碼以下:

代碼比較簡單,看看 <1>、<2> 上的註釋。

11.三、簡單測試
① 啓動 Netty Server 服務端。

② 啓動 Netty Client 客戶端 A。而後使用 Postman 模擬一次認證請求(用戶爲 yunai)。

以下圖所示:

③ 啓動 Netty Client 客戶端 B。注意,須要設置 --server.port 端口爲 8081,避免衝突。

④ 啓動 Netty Client 客戶端 C。注意,須要設置 --server.port 端口爲 8082,避免衝突。

⑤ 最後使用 Postman 模擬一次發送羣聊消息。

以下圖所示:

同時,能夠看到客戶端 A 羣發給全部客戶端的日誌以下:

最後,要想系統地學習IM開發的方方面面,請繼續閱讀:《新手入門一篇就夠:從零開發移動端IM》

附錄、系列文章

《跟着源碼學IM(一):手把手教你用Netty實現心跳機制、斷線重連機制》
《跟着源碼學IM(二:自已開發IM很難?手把手教你擼一個Andriod版IM》
《跟着源碼學IM(三)基於Netty,從零開發一個IM服務端》
《跟着源碼學IM(四)拿起鍵盤就是幹,教你徒手開發一套分佈式IM系統》
《跟着源碼學IM(五):正確理解IM長鏈接、心跳及重連機制,並動手實現》
《跟着源碼學IM(六):手把手教你用Go快速搭建高性能、可擴展的IM系統》
《跟着源碼學IM(七):手把手教你用WebSocket打造Web端IM聊天》
《跟着源碼學IM(八):萬字長文,手把手教你用Netty打造IM聊天》(* 本文)

本文已同步發佈於「即時通信技術圈」公衆號。

▲ 本文在公衆號上的連接是:點此進入。同步發佈連接是:http://www.52im.net/thread-34...

相關文章
相關標籤/搜索