在《芋道 Spring Boot WebSocket 入門》文章中,咱們使用 WebSocket 實現了一個簡單的 IM 功能,支持身份認證、私聊消息、羣聊消息。前端
而後就有胖友私信艿艿,但願使用純 Netty 實現一個相似的功能。良心的艿艿,固然不會給她發紅人卡,所以就有了本文。可能有胖友不知道 Netty 是什麼,這裏簡單介紹下:java
Netty 是一個 Java 開源框架。Netty 提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。git
也就是說,Netty 是一個基於 NIO 的客戶、服務器端編程框架,使用Netty 能夠確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。github
Netty 至關簡化和流線化了網絡應用的編程開發過程,例如,TCP 和 UDP 的 Socket 服務開發。web
下面,咱們來新建三個項目,以下圖所示:算法
lab-67-netty-demo-server
項目:搭建 Netty 服務端。lab-67-netty-demo-client
項目:搭建 Netty 客戶端。lab-67-netty-demo-common
項目:提供 Netty 的基礎封裝,提供消息的編解碼、分發的功能。另外,咱們也會提供 Netty 經常使用功能的示例:spring
不嗶嗶,直接開幹。數據庫
友情提示:可能會胖友擔憂,沒有 Netty 基礎是否是沒法閱讀本文?!艿艿的想法,看!就硬看,按照代碼先本身能搭建一下哈~文末,艿艿會提供一波 Netty 基礎入門的文章。apache
本文在提供完整代碼示例,可見 https://github.com/YunaiV/Spr... 的 lab-67 目錄。原創不易,給點個 Star 嘿,一塊兒衝鴨!編程
本小節,咱們先來使用 Netty 構建服務端與客戶端的核心代碼,讓胖友對項目的代碼有個初始的認知。
建立 lab-67-netty-demo-server
項目,搭建 Netty 服務端。以下圖所示:
下面,咱們只會暫時看看 server
包下的代碼,避免信息量過大,擊穿胖友的禿頭。
建立 NettyServer 類,Netty 服務端。代碼以下:
@Component public class NettyServer { private Logger logger = LoggerFactory.getLogger(getClass()); @Value("${netty.port}") private Integer port; @Autowired private NettyServerHandlerInitializer nettyServerHandlerInitializer; /** * boss 線程組,用於服務端接受客戶端的鏈接 */ private EventLoopGroup bossGroup = new NioEventLoopGroup(); /** * worker 線程組,用於服務端接受客戶端的數據讀寫 */ private EventLoopGroup workerGroup = new NioEventLoopGroup(); /** * Netty Server Channel */ private Channel channel; /** * 啓動 Netty Server */ @PostConstruct public void start() throws InterruptedException { // <2.1> 建立 ServerBootstrap 對象,用於 Netty Server 啓動 ServerBootstrap bootstrap = new ServerBootstrap(); // <2.2> 設置 ServerBootstrap 的各類屬性 bootstrap.group(bossGroup, workerGroup) // <2.2.1> 設置兩個 EventLoopGroup 對象 .channel(NioServerSocketChannel.class) // <2.2.2> 指定 Channel 爲服務端 NioServerSocketChannel .localAddress(new InetSocketAddress(port)) // <2.2.3> 設置 Netty Server 的端口 .option(ChannelOption.SO_BACKLOG, 1024) // <2.2.4> 服務端 accept 隊列的大小 .childOption(ChannelOption.SO_KEEPALIVE, true) // <2.2.5> TCP Keepalive 機制,實現 TCP 層級的心跳保活功能 .childOption(ChannelOption.TCP_NODELAY, true) // <2.2.6> 容許較小的數據包的發送,下降延遲 .childHandler(nettyServerHandlerInitializer); // <2> 綁定端口,並同步等待成功,即啓動服務端 ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { channel = future.channel(); logger.info("[start][Netty Server 啓動在 {} 端口]", port); } } /** * 關閉 Netty Server */ @PreDestroy public void shutdown() { // <3.1> 關閉 Netty Server if (channel != null) { channel.close(); } // <3.2> 優雅關閉兩個 EventLoopGroup 對象 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
🔥 ① 在類上,添加 @Component
註解,把 NettyServer 的建立交給 Spring 管理。
port
屬性,讀取 application.yml
配置文件的 netty.port
配置項。#start()
方法,添加 @PostConstruct
註解,啓動 Netty 服務器。#shutdown()
方法,添加 @PreDestroy
註解,關閉 Netty 服務器。🔥 ② 咱們來詳細看看 #start()
方法的代碼,如何實現 Netty Server 的啓動。
<2.1>
處,建立 ServerBootstrap 類,Netty 提供的服務器的啓動類,方便咱們初始化 Server。
<2.2>
處,設置 ServerBootstrap 的各類屬性。
友情提示:這裏涉及較多 Netty 組件的知識,艿艿先以簡單的語言描述,後續胖友在文末的 Netty 基礎入門的文章,補充學噢。
<2.2.1>
處,調用 #group(EventLoopGroup parentGroup, EventLoopGroup childGroup)
方法,設置使用 bossGroup
和 workerGroup
。其中:
bossGroup
屬性:Boss 線程組,用於服務端接受客戶端的鏈接。workerGroup
屬性:Worker 線程組,用於服務端接受客戶端的數據讀寫。Netty 採用的是多 Reactor 多線程的模型,服務端能夠接受更多客戶端的數據讀寫的能力。緣由是:
- 建立專門用於接受客戶端鏈接的
bossGroup
線程組,避免由於已鏈接的客戶端的數據讀寫頻繁,影響新的客戶端的鏈接。- 建立專門用於接收客戶端讀寫的
workerGroup
線程組,多個線程進行客戶端的數據讀寫,能夠支持更多客戶端。課後習題:感興趣的胖友,後續能夠看看《【NIO 系列】——之 Reactor 模型》文章。
<2.2.2>
處,調用 #channel(Class<? extends C> channelClass)
方法,設置使用 NioServerSocketChannel 類,它是 Netty 定義的 NIO 服務端 TCP Socket 實現類。
<2.2.3>
處,調用 #localAddress(SocketAddress localAddress)
方法,設置服務端的端口。
<2.2.4>
處,調用 option#(ChannelOption<T> option, T value)
方法,設置服務端接受客戶端的鏈接隊列大小。由於 TCP 創建鏈接是三次握手,因此第一次握手完成後,會添加到服務端的鏈接隊列中。
課後習題:更多相關內容,後續能夠看看 《淺談 TCP Socket 的 backlog 參數》文章。
<2.2.5>
處,調用 #childOption(ChannelOption<T> childOption, T value)
方法,TCP Keepalive 機制,實現 TCP 層級的心跳保活功能。
課後習題:更多相關內容,後續能夠看看 《TCP Keepalive 機制刨根問底》文章。
<2.2.6>
處,調用 #childOption(ChannelOption<T> childOption, T value)
方法,容許較小的數據包的發送,下降延遲。
課後習題:更多相關內容,後續能夠看看 《詳解 Socket 編程 --- TCP_NODELAY 選項》文章。
<2.2.7>
處,調用 #childHandler(ChannelHandler childHandler)
方法,設置客戶端鏈接上來的 Channel 的處理器爲 NettyServerHandlerInitializer。稍後咱們在「2.1.2 NettyServerHandlerInitializer」小節來看看。
<2.3>
處,調用 #bind()
+ #sync()
方法,綁定端口,並同步等待成功,即啓動服務端。
🔥 ③ 咱們來詳細看看 #shutdown()
方法的代碼,如何實現 Netty Server 的關閉。
<3.1>
處,調用 Channel 的 #close()
方法,關閉 Netty Server,這樣客戶端就再也不能鏈接了。
<3.2>
處,調用 EventLoopGroup 的 #shutdownGracefully()
方法,優雅關閉 EventLoopGroup。例如說,它們裏面的線程池。
在看 NettyServerHandlerInitializer 的代碼以前,咱們須要先了解下 Netty 的 ChannelHandler 組件,用來處理 Channel 的各類事件。這裏的事件很普遍,好比能夠是鏈接、數據讀寫、異常、數據轉換等等。
ChannelHandler 有很是多的子類,其中有個很是特殊的 ChannelInitializer,它用於 Channel 建立時,實現自定義的初始化邏輯。這裏咱們建立的 NettyServerHandlerInitializer 類,就繼承了 ChannelInitializer 抽象類,代碼以下:
@Component public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> { /** * 心跳超時時間 */ private static final Integer READ_TIMEOUT_SECONDS = 3 * 60; @Autowired private MessageDispatcher messageDispatcher; @Autowired private NettyServerHandler nettyServerHandler; @Override protected void initChannel(Channel ch) { // <1> 得到 Channel 對應的 ChannelPipeline ChannelPipeline channelPipeline = ch.pipeline(); // <2> 添加一堆 NettyServerHandler 到 ChannelPipeline 中 channelPipeline // 空閒檢測 .addLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS)) // 編碼器 .addLast(new InvocationEncoder()) // 解碼器 .addLast(new InvocationDecoder()) // 消息分發器 .addLast(messageDispatcher) // 服務端處理器 .addLast(nettyServerHandler) ; } }
在每個客戶端與服務端創建完成鏈接時,服務端會建立一個 Channel 與之對應。此時,NettyServerHandlerInitializer 會進行執行 #initChannel(Channel c)
方法,進行自定義的初始化。
友情提示:建立的客戶端的 Channel,不要和 「2.1.1 NettyServer」小節的 NioServerSocketChannel 混淆,不是同一個哈。在
#initChannel(Channel ch)
方法的ch
參數,就是此時建立的客戶端 Channel。
① <1>
處,調用 Channel 的 #pipeline()
方法,得到客戶端 Channel 對應的 ChannelPipeline。ChannelPipeline 由一系列的 ChannelHandler 組成,又或者說是 ChannelHandler 鏈。這樣, Channel 全部上全部的事件都會通過 ChannelPipeline,被其上的 ChannelHandler 所處理。
② <2>
處,添加五個 ChannelHandler 到 ChannelPipeline 中,每個的做用看其上的註釋。具體的,咱們會在後續的小節詳細解釋。
建立 NettyServerHandler 類,繼承 ChannelInboundHandlerAdapter 類,實現客戶端 Channel 創建鏈接、斷開鏈接、異常時的處理。代碼以下:
@Component @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private NettyChannelManager channelManager; @Override public void channelActive(ChannelHandlerContext ctx) { // 從管理器中添加 channelManager.add(ctx.channel()); } @Override public void channelUnregistered(ChannelHandlerContext ctx) { // 從管理器中移除 channelManager.remove(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error("[exceptionCaught][鏈接({}) 發生異常]", ctx.channel().id(), cause); // 斷開鏈接 ctx.channel().close(); } }
① 在類上添加 @ChannelHandler.Sharable
註解,標記這個 ChannelHandler 能夠被多個 Channel 使用。
② channelManager
屬性,是咱們實現的客戶端 Channel 的管理器。
#channelActive(ChannelHandlerContext ctx)
方法,在客戶端和服務端創建鏈接完成時,調用 NettyChannelManager 的 #add(Channel channel)
方法,添加到其中。#channelUnregistered(ChannelHandlerContext ctx)
方法,在客戶端和服務端斷開鏈接時,調用 NettyChannelManager 的 #add(Channel channel)
方法,從其中移除。具體的 NettyChannelManager 的源碼,咱們在「2.1.4 NettyChannelManager」 小節中來瞅瞅~
③ #exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
方法,在處理 Channel 的事件發生異常時,調用 Channel 的 #close()
方法,斷開和客戶端的鏈接。
建立 NettyChannelManager 類,提供兩種功能。
🔥 ① 客戶端 Channel 的管理。代碼以下:
@Component public class NettyChannelManager { /** * {@link Channel#attr(AttributeKey)} 屬性中,表示 Channel 對應的用戶 */ private static final AttributeKey<String> CHANNEL_ATTR_KEY_USER = AttributeKey.newInstance("user"); private Logger logger = LoggerFactory.getLogger(getClass()); /** * Channel 映射 */ private ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>(); /** * 用戶與 Channel 的映射。 * * 經過它,能夠獲取用戶對應的 Channel。這樣,咱們能夠向指定用戶發送消息。 */ private ConcurrentMap<String, Channel> userChannels = new ConcurrentHashMap<>(); /** * 添加 Channel 到 {@link #channels} 中 * * @param channel Channel */ public void add(Channel channel) { channels.put(channel.id(), channel); logger.info("[add][一個鏈接({})加入]", channel.id()); } /** * 添加指定用戶到 {@link #userChannels} 中 * * @param channel Channel * @param user 用戶 */ public void addUser(Channel channel, String user) { Channel existChannel = channels.get(channel.id()); if (existChannel == null) { logger.error("[addUser][鏈接({}) 不存在]", channel.id()); return; } // 設置屬性 channel.attr(CHANNEL_ATTR_KEY_USER).set(user); // 添加到 userChannels userChannels.put(user, channel); } /** * 將 Channel 從 {@link #channels} 和 {@link #userChannels} 中移除 * * @param channel Channel */ public void remove(Channel channel) { // 移除 channels channels.remove(channel.id()); // 移除 userChannels if (channel.hasAttr(CHANNEL_ATTR_KEY_USER)) { userChannels.remove(channel.attr(CHANNEL_ATTR_KEY_USER).get()); } logger.info("[remove][一個鏈接({})離開]", channel.id()); } }
🔥 ② 向客戶端 Channel 發送消息。代碼以下:
@Component public class NettyChannelManager { /** * 向指定用戶發送消息 * * @param user 用戶 * @param invocation 消息體 */ public void send(String user, Invocation invocation) { // 得到用戶對應的 Channel Channel channel = userChannels.get(user); if (channel == null) { logger.error("[send][鏈接不存在]"); return; } if (!channel.isActive()) { logger.error("[send][鏈接({})未激活]", channel.id()); return; } // 發送消息 channel.writeAndFlush(invocation); } /** * 向全部用戶發送消息 * * @param invocation 消息體 */ public void sendAll(Invocation invocation) { for (Channel channel : channels.values()) { if (!channel.isActive()) { logger.error("[send][鏈接({})未激活]", channel.id()); return; } // 發送消息 channel.writeAndFlush(invocation); } } }
建立 pom.xml
文件,引入 Netty 依賴。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>lab-67-netty-demo</artifactId> <groupId>cn.iocoder.springboot.labs</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>lab-67-netty-demo-server</artifactId> <properties> <!-- 依賴相關配置 --> <spring.boot.version>2.2.4.RELEASE</spring.boot.version> <!-- 插件相關配置 --> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- Spring Boot 基礎依賴 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- Netty 依賴 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.50.Final</version> </dependency> <!-- 引入 netty-demo-common 封裝 --> <dependency> <groupId>cn.iocoder.springboot.labs</groupId> <artifactId>lab-67-netty-demo-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
建立 NettyServerApplication 類,Netty Server 啓動類。代碼以下:
@SpringBootApplication public class NettyServerApplication { public static void main(String[] args) { SpringApplication.run(NettyServerApplication.class, args); } }
執行 NettyServerApplication 類,啓動 Netty Server 服務器。日誌以下:
... // 省略其餘日誌 2020-06-21 00:16:38.801 INFO 41948 --- [ main] c.i.s.l.n.server.NettyServer : [start][Netty Server 啓動在 8888 端口] 2020-06-21 00:16:38.893 INFO 41948 --- [ main] c.i.s.l.n.NettyServerApplication : Started NettyServerApplication in 0.96 seconds (JVM running for 1.4)
Netty Server 啓動在 8888 端口。
建立 lab-67-netty-demo-client
項目,搭建 Netty 客戶端。以下圖所示:
下面,咱們只會暫時看看 client
包下的代碼,避免信息量過大,擊穿胖友的禿頭。
建立 NettyClient 類,Netty 客戶端。代碼以下:
@Component public class NettyClient { /** * 重連頻率,單位:秒 */ private static final Integer RECONNECT_SECONDS = 20; private Logger logger = LoggerFactory.getLogger(getClass()); @Value("${netty.server.host}") private String serverHost; @Value("${netty.server.port}") private Integer serverPort; @Autowired private NettyClientHandlerInitializer nettyClientHandlerInitializer; /** * 線程組,用於客戶端對服務端的鏈接、數據讀寫 */ private EventLoopGroup eventGroup = new NioEventLoopGroup(); /** * Netty Client Channel */ private volatile Channel channel; /** * 啓動 Netty Server */ @PostConstruct public void start() throws InterruptedException { // <2.1> 建立 Bootstrap 對象,用於 Netty Client 啓動 Bootstrap bootstrap = new Bootstrap(); // <2.2> bootstrap.group(eventGroup) // <2.2.1> 設置一個 EventLoopGroup 對象 .channel(NioSocketChannel.class) // <2.2.2> 指定 Channel 爲客戶端 NioSocketChannel .remoteAddress(serverHost, serverPort) // <2.2.3> 指定鏈接服務器的地址 .option(ChannelOption.SO_KEEPALIVE, true) // <2.2.4> TCP Keepalive 機制,實現 TCP 層級的心跳保活功能 .option(ChannelOption.TCP_NODELAY, true) //<2.2.5> 容許較小的數據包的發送,下降延遲 .handler(nettyClientHandlerInitializer); // <2.3> 鏈接服務器,並異步等待成功,即啓動客戶端 bootstrap.connect().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // 鏈接失敗 if (!future.isSuccess()) { logger.error("[start][Netty Client 鏈接服務器({}:{}) 失敗]", serverHost, serverPort); reconnect(); return; } // 鏈接成功 channel = future.channel(); logger.info("[start][Netty Client 鏈接服務器({}:{}) 成功]", serverHost, serverPort); } }); } public void reconnect() { // ... 暫時省略代碼。 } /** * 關閉 Netty Server */ @PreDestroy public void shutdown() { // <3.1> 關閉 Netty Client if (channel != null) { channel.close(); } // <3.2> 優雅關閉一個 EventLoopGroup 對象 eventGroup.shutdownGracefully(); } /** * 發送消息 * * @param invocation 消息體 */ public void send(Invocation invocation) { if (channel == null) { logger.error("[send][鏈接不存在]"); return; } if (!channel.isActive()) { logger.error("[send][鏈接({})未激活]", channel.id()); return; } // 發送消息 channel.writeAndFlush(invocation); } }
友情提示:總體代碼,是和 「2.1.1 NettyServer」對等,且基本是一致的。
🔥 ① 在類上,添加 @Component
註解,把 NettyClient 的建立交給 Spring 管理。
serverHost
和 serverPort
屬性,讀取 application.yml
配置文件的 netty.server.host
和 netty.server.port
配置項。#start()
方法,添加 @PostConstruct
註解,啓動 Netty 客戶端。#shutdown()
方法,添加 @PreDestroy
註解,關閉 Netty 客戶端。🔥 ② 咱們來詳細看看 #start()
方法的代碼,如何實現 Netty Client 的啓動,創建和服務器的鏈接。
<2.1>
處,建立 Bootstrap 類,Netty 提供的客戶端的啓動類,方便咱們初始化 Client。
<2.2>
處,設置 Bootstrap 的各類屬性。
<2.2.1>
處,調用 #group(EventLoopGroup group)
方法,設置使用 eventGroup
線程組,實現客戶端對服務端的鏈接、數據讀寫。
<2.2.2>
處,調用 #channel(Class<? extends C> channelClass)
方法,設置使用 NioSocketChannel 類,它是 Netty 定義的 NIO 服務端 TCP Client 實現類。
<2.2.3>
處,調用 #remoteAddress(SocketAddress localAddress)
方法,設置鏈接服務端的地址。
<2.2.4>
處,調用 #option(ChannelOption<T> childOption, T value)
方法,TCP Keepalive 機制,實現 TCP 層級的心跳保活功能。
<2.2.5>
處,調用 #childOption(ChannelOption<T> childOption, T value)
方法,容許較小的數據包的發送,下降延遲。
<2.2.7>
處,調用 #handler(ChannelHandler childHandler)
方法,設置本身 Channel 的處理器爲 NettyClientHandlerInitializer。稍後咱們在「2.2.2 NettyClientHandlerInitializer」小節來看看。
<2.3>
處,調用 #connect()
方法,鏈接服務器,並異步等待成功,即啓動客戶端。同時,添加回調監聽器 ChannelFutureListener,在鏈接服務端失敗的時候,調用 #reconnect()
方法,實現定時重連。😈 具體 #reconnect()
方法的代碼,咱們稍後在瞅瞅哈。
③ 咱們來詳細看看 #shutdown()
方法的代碼,如何實現 Netty Client 的關閉。
<3.1>
處,調用 Channel 的 #close()
方法,關閉 Netty Client,這樣客戶端就斷開和服務端的鏈接。
<3.2>
處,調用 EventLoopGroup 的 #shutdownGracefully()
方法,優雅關閉 EventLoopGroup。例如說,它們裏面的線程池。
④ #send(Invocation invocation)
方法,實現向服務端發送消息。
由於 NettyClient 是客戶端,因此無需像 NettyServer 同樣使用「2.1.4 NettyChannelManager」維護 Channel 的集合。
建立的 NettyClientHandlerInitializer 類,就繼承了 ChannelInitializer 抽象類,實現和服務端創建鏈接後,添加相應的 ChannelHandler 處理器。代碼以下:
@Component public class NettyClientHandlerInitializer extends ChannelInitializer<Channel> { /** * 心跳超時時間 */ private static final Integer READ_TIMEOUT_SECONDS = 60; @Autowired private MessageDispatcher messageDispatcher; @Autowired private NettyClientHandler nettyClientHandler; @Override protected void initChannel(Channel ch) { ch.pipeline() // 空閒檢測 .addLast(new IdleStateHandler(READ_TIMEOUT_SECONDS, 0, 0)) .addLast(new ReadTimeoutHandler(3 * READ_TIMEOUT_SECONDS)) // 編碼器 .addLast(new InvocationEncoder()) // 解碼器 .addLast(new InvocationDecoder()) // 消息分發器 .addLast(messageDispatcher) // 客戶端處理器 .addLast(nettyClientHandler) ; } }
和「2.1.2 NettyServerHandlerInitializer」的代碼基本同樣,差異在於空閒檢測額外增長 IdleStateHandler,客戶端處理器換成了 NettyClientHandler。
建立 NettyClientHandler 類,實現客戶端 Channel 斷開鏈接、異常時的處理。代碼以下:
@Component @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private NettyClient nettyClient; @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 發起重連 nettyClient.reconnect(); // 繼續觸發事件 super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error("[exceptionCaught][鏈接({}) 發生異常]", ctx.channel().id(), cause); // 斷開鏈接 ctx.channel().close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception { // 空閒時,向服務端發起一次心跳 if (event instanceof IdleStateEvent) { logger.info("[userEventTriggered][發起一次心跳]"); HeartbeatRequest heartbeatRequest = new HeartbeatRequest(); ctx.writeAndFlush(new Invocation(HeartbeatRequest.TYPE, heartbeatRequest)) .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { super.userEventTriggered(ctx, event); } } }
① 在類上添加 @ChannelHandler.Sharable
註解,標記這個 ChannelHandler 能夠被多個 Channel 使用。
② #channelInactive(ChannelHandlerContext ctx)
方法,實如今和服務端斷開鏈接時,調用 NettyClient 的 #reconnect()
方法,實現客戶端定時和服務端重連。
③ #exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
方法,在處理 Channel 的事件發生異常時,調用 Channel 的 #close()
方法,斷開和客戶端的鏈接。
④ #userEventTriggered(ChannelHandlerContext ctx, Object event)
方法,在客戶端在空閒時,向服務端發送一次心跳,即心跳機制。這塊的內容,咱們稍後詳細講講。
建立 pom.xml
文件,引入 Netty 依賴。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>lab-67-netty-demo</artifactId> <groupId>cn.iocoder.springboot.labs</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>lab-67-netty-demo-client</artifactId> <properties> <!-- 依賴相關配置 --> <spring.boot.version>2.2.4.RELEASE</spring.boot.version> <!-- 插件相關配置 --> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- 實現對 Spring MVC 的自動化配置 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Netty 依賴 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.50.Final</version> </dependency> <!-- 引入 netty-demo-common 封裝 --> <dependency> <groupId>cn.iocoder.springboot.labs</groupId> <artifactId>lab-67-netty-demo-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
建立 NettyClientApplication 類,Netty Client 啓動類。代碼以下:
@SpringBootApplication public class NettyClientApplication { public static void main(String[] args) { SpringApplication.run(NettyClientApplication.class, args); } }
執行 NettyClientApplication 類,啓動 Netty Client 客戶端。日誌以下:
... // 省略其餘日誌 2020-06-21 09:06:12.205 INFO 44029 --- [ntLoopGroup-2-1] c.i.s.l.n.client.NettyClient : [start][Netty Client 鏈接服務器(127.0.0.1:8888) 成功]
同時 Netty Server 服務端發現有一個客戶端接入,打印以下日誌:
2020-06-21 09:06:12.268 INFO 41948 --- [ntLoopGroup-3-1] c.i.s.l.n.server.NettyChannelManager : [add][一個鏈接(db652822)加入]
至此,咱們已經構建 Netty 服務端和客戶端完成。由於 Netty 提供的 API 很是便利,因此咱們不會像直接使用 NIO 時,須要處理大量底層且細節的代碼。
不過,如上的內容僅僅是本文的開胃菜,正片即將開始!美滋滋,繼續往下看,奧利給!
在「2. 構建 Netty 服務端與客戶端」小節中,咱們實現了客戶端和服務端的鏈接功能。而本小節,咱們要讓它們兩可以說上話,即進行數據的讀寫。
在平常項目的開發中,前端和後端之間採用 HTTP 做爲通訊協議,使用文本內容進行交互,數據格式通常是 JSON。可是在 TCP 的世界裏,咱們須要本身基於二進制構建,構建客戶端和服務端的通訊協議。
咱們以客戶端向服務端發送消息來舉個例子,假設客戶端要發送一個登陸請求,對應的類以下:
public class AuthRequest { /** 用戶名 **/ private String username; /** 密碼 **/ private String password; }
友情提示:服務端向客戶端發消息,也是同樣的過程哈!
序列化的工具很是多,例如說 Google 提供的 Protobuf,性能高效,且序列化出來的二進制數據較小。Netty 對 Protobuf 進行集成,提供了相應的編解碼器。以下圖所示:
可是考慮到不少胖友對 Protobuf 並不瞭解,由於它實現序列化又增長胖友的額外學習成本。所以,艿艿仔細一個捉摸,仍是採用 JSON 方式進行序列化。可能胖友會疑惑,JSON 不是將對象轉換成字符串嗎?嘿嘿,咱們再把字符串轉換成 byte 字節數組就能夠啦~
下面,咱們新建 lab-67-netty-demo-common
項目,並在 codec
包下,實現咱們自定義的通訊協議。以下圖所示:
建立 Invocation 類,通訊協議的消息體。代碼以下:
/** * 通訊協議的消息體 */ public class Invocation { /** * 類型 */ private String type; /** * 消息,JSON 格式 */ private String message; // 空構造方法 public Invocation() { } public Invocation(String type, String message) { this.type = type; this.message = message; } public Invocation(String type, Message message) { this.type = type; this.message = JSON.toJSONString(message); } // ... 省略 setter、getter、toString 方法 }
① type
屬性,類型,用於匹配對應的消息處理器。若是類比 HTTP 協議,type
屬性至關於請求地址。
② message
屬性,消息內容,使用 JSON 格式。
另外,Message 是咱們定義的消息接口。代碼以下:
public interface Message { // ... 空,做爲標記接口 }
在開始看 Invocation 的編解碼處理器以前,咱們先了解下粘包與拆包的概念。
若是的內容,引用 《Netty 解決粘包和拆包問題的四種方案》文章的內容,進行二次編輯。
產生粘包和拆包問題的主要緣由是,操做系統在發送 TCP 數據的時候,底層會有一個緩衝區,例如 1024 個字節大小。
若是一次請求發送的數據量比較小,沒達到緩衝區大小,TCP 則會將多個請求合併爲同一個請求進行發送,這就造成了粘包問題。
例如說,在 《詳解 Socket 編程 --- TCP_NODELAY 選項》文章中咱們能夠看到,在關閉 Nagle 算法時,請求不會等待知足緩衝區大小,而是儘快發出,下降延遲。
以下圖展現了粘包和拆包的一個示意圖,演示了粘包和拆包的三種狀況:
對於粘包和拆包問題,常見的解決方案有三種:
🔥 ① 客戶端在發送數據包的時候,每一個包都固定長度。好比 1024 個字節大小,若是客戶端發送的數據長度不足 1024 個字節,則經過補充空格的方式補全到指定長度。
這種方式,艿艿暫時沒有找到採用這種方式的案例。
🔥 ② 客戶端在每一個包的末尾使用固定的分隔符。例如 \r\n
,若是一個包被拆分了,則等待下一個包發送過來以後找到其中的 \r\n
,而後對其拆分後的頭部部分與前一個包的剩餘部分進行合併,這樣就獲得了一個完整的包。
具體的案例,有 HTTP、WebSocket、Redis。
🔥 ③ 將消息分爲頭部和消息體,在頭部中保存有當前整個消息的長度,只有在讀取到足夠長度的消息以後纔算是讀到了一個完整的消息。
友情提示:方案 ③ 是 ① 的升級版, 動態長度。
本文,艿艿將採用這種方式,在每次 Invocation 序列化成字節數組寫入 TCP Socket 以前,先將字節數組的長度寫到其中。以下圖所示:
建立 InvocationEncoder 類,實現將 Invocation 序列化,並寫入到 TCP Socket 中。代碼以下:
public class InvocationEncoder extends MessageToByteEncoder<Invocation> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) { // <2.1> 將 Invocation 轉換成 byte[] 數組 byte[] content = JSON.toJSONBytes(invocation); // <2.2> 寫入 length out.writeInt(content.length); // <2.3> 寫入內容 out.writeBytes(content); logger.info("[encode][鏈接({}) 編碼了一條消息({})]", ctx.channel().id(), invocation.toString()); } }
① MessageToByteEncoder 是 Netty 定義的編碼 ChannelHandler 抽象類,將泛型 <I>
消息轉換成字節數組。
② #encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out)
方法,進行編碼的邏輯。
<2.1>
處,調用 JSON 的 #toJSONBytes(Object object, SerializerFeature... features)
方法,將 Invocation 轉換成 字節數組。
<2.2>
處,將字節數組的長度,寫入到 TCP Socket 當中。這樣,後續「3.4 InvocationDecoder」能夠根據該長度,解析到消息,解決粘包和拆包的問題。
友情提示:MessageToByteEncoder 會最終將
ByteBuf out
寫到 TCP Socket 中。
<2.3>
處,將字節數組,寫入到 TCP Socket 當中。
建立 InvocationDecoder 類,實現從 TCP Socket 讀取字節數組,反序列化成 Invocation。代碼以下:
public class InvocationDecoder extends ByteToMessageDecoder { private Logger logger = LoggerFactory.getLogger(getClass()); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // <2.1> 標記當前讀取位置 in.markReaderIndex(); // <2.2> 判斷是否可以讀取 length 長度 if (in.readableBytes() <= 4) { return; } // <2.3> 讀取長度 int length = in.readInt(); if (length < 0) { throw new CorruptedFrameException("negative length: " + length); } // <3.1> 若是 message 不夠可讀,則退回到原讀取位置 if (in.readableBytes() < length) { in.resetReaderIndex(); return; } // <3.2> 讀取內容 byte[] content = new byte[length]; in.readBytes(content); // <3.3> 解析成 Invocation Invocation invocation = JSON.parseObject(content, Invocation.class); out.add(invocation); logger.info("[decode][鏈接({}) 解析到一條消息({})]", ctx.channel().id(), invocation.toString()); } }
① ByteToMessageDecoder 是 Netty 定義的解碼 ChannelHandler 抽象類,在 TCP Socket 讀取到新數據時,觸發進行解碼。
② 在 <2.1>
、<2.2>
、<2.3>
處,從 TCP Socket 中讀取長度。
③ 在 <3.1>
、<3.2>
、<3.3>
處,從 TCP Socket 中讀取字節數組,並反序列化成 Invocation 對象。
最終,添加 List<Object> out
中,交給後續的 ChannelHandler 進行處理。稍後,咱們將在「4. 消息分發」小結中,會看到 MessageDispatcher 將 Invocation 分發到其對應的 MessageHandler 中,進行業務邏輯的執行。
建立 pom.xml
文件,引入 Netty、FastJSON 等等依賴。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>lab-67-netty-demo</artifactId> <groupId>cn.iocoder.springboot.labs</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>lab-67-netty-demo-common</artifactId> <properties> <!-- 插件相關配置 --> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> </properties> <dependencies> <!-- Netty 依賴 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.50.Final</version> </dependency> <!-- FastJSON 依賴 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.71</version> </dependency> <!-- 引入 Spring 相關依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>5.2.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.2.5.RELEASE</version> </dependency> <!-- 引入 SLF4J 依賴 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> </dependencies> </project>
至此,咱們已經完成通訊協議的定義、編解碼的邏輯,是否是蠻有趣的?!
另外,咱們在 NettyServerHandlerInitializer 和 NettyClientHandlerInitializer 的初始化代碼中,將編解碼器添加到其中。以下圖所示:
在 SpringMVC 中,DispatcherServlet 會根據請求地址、方法等,將請求分發到匹配的 Controller 的 Method 方法上。
在 lab-67-netty-demo-client
項目的 dispatcher
包中,咱們建立了 MessageDispatcher 類,實現和 DispatcherServlet 相似的功能,將 Invocation 分發到其對應的 MessageHandler 中,進行業務邏輯的執行。
下面,咱們來看看具體的代碼實現。
建立 Message 接口,定義消息的標記接口。代碼以下:
public interface Message { }
下圖,是咱們涉及到的 Message 實現類。以下圖所示:
建立 MessageHandler 接口,消息處理器接口。代碼以下:
public interface MessageHandler<T extends Message> { /** * 執行處理消息 * * @param channel 通道 * @param message 消息 */ void execute(Channel channel, T message); /** * @return 消息類型,即每一個 Message 實現類上的 TYPE 靜態字段 */ String getType(); }
<T>
,須要是 Message 的實現類。下圖,是咱們涉及到的 MessageHandler 實現類。以下圖所示:
建立 MessageHandlerContainer 類,做爲 MessageHandler 的容器。代碼以下:
public class MessageHandlerContainer implements InitializingBean { private Logger logger = LoggerFactory.getLogger(getClass()); /** * 消息類型與 MessageHandler 的映射 */ private final Map<String, MessageHandler> handlers = new HashMap<>(); @Autowired private ApplicationContext applicationContext; @Override public void afterPropertiesSet() throws Exception { // 經過 ApplicationContext 得到全部 MessageHandler Bean applicationContext.getBeansOfType(MessageHandler.class).values() // 得到全部 MessageHandler Bean .forEach(messageHandler -> handlers.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中 logger.info("[afterPropertiesSet][消息處理器數量:{}]", handlers.size()); } /** * 得到類型對應的 MessageHandler * * @param type 類型 * @return MessageHandler */ MessageHandler getMessageHandler(String type) { MessageHandler handler = handlers.get(type); if (handler == null) { throw new IllegalArgumentException(String.format("類型(%s) 找不到匹配的 MessageHandler 處理器", type)); } return handler; } /** * 得到 MessageHandler 處理的消息類 * * @param handler 處理器 * @return 消息類 */ static Class<? extends Message> getMessageClass(MessageHandler handler) { // 得到 Bean 對應的 Class 類名。由於有可能被 AOP 代理過。 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler); // 得到接口的 Type 數組 Type[] interfaces = targetClass.getGenericInterfaces(); Class<?> superclass = targetClass.getSuperclass(); while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { // 此處,是以父類的接口爲準 interfaces = superclass.getGenericInterfaces(); superclass = targetClass.getSuperclass(); } if (Objects.nonNull(interfaces)) { // 遍歷 interfaces 數組 for (Type type : interfaces) { // 要求 type 是泛型參數 if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; // 要求是 MessageHandler 接口 if (Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) { Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); // 取首個元素 if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return (Class<Message>) actualTypeArguments[0]; } else { throw new IllegalStateException(String.format("類型(%s) 得到不到消息類型", handler)); } } } } } throw new IllegalStateException(String.format("類型(%s) 得到不到消息類型", handler)); } }
① 實現 InitializingBean 接口,在 #afterPropertiesSet()
方法中,掃描全部 MessageHandler Bean ,添加到 MessageHandler 集合中。
② 在 #getMessageHandler(String type)
方法中,得到類型對應的 MessageHandler 對象。稍後,咱們會在 MessageDispatcher 調用該方法。
③ 在 #getMessageClass(MessageHandler handler)
方法中,經過 MessageHandler 中,經過解析其類上的泛型,得到消息類型對應的 Class 類。這是參考 rocketmq-spring
項目的 DefaultRocketMQListenerContainer#getMessageType()
方法,進行略微修改。
友情提示:若是胖友對 Java 的泛型機制沒有作過一點了解,可能略微有點硬核。能夠先暫時跳過,知道意圖便可。
建立 MessageDispatcher 類,將 Invocation 分發到其對應的 MessageHandler 中,進行業務邏輯的執行。代碼以下:
@ChannelHandler.Sharable public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> { @Autowired private MessageHandlerContainer messageHandlerContainer; private final ExecutorService executor = Executors.newFixedThreadPool(200); @Override protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) { // <3.1> 得到 type 對應的 MessageHandler 處理器 MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType()); // 得到 MessageHandler 處理器的消息類 Class<? extends Message> messageClass = MessageHandlerContainer.getMessageClass(messageHandler); // <3.2> 解析消息 Message message = JSON.parseObject(invocation.getMessage(), messageClass); // <3.3> 執行邏輯 executor.submit(new Runnable() { @Override public void run() { // noinspection unchecked messageHandler.execute(ctx.channel(), message); } }); } }
① 在類上添加 @ChannelHandler.Sharable
註解,標記這個 ChannelHandler 能夠被多個 Channel 使用。
② SimpleChannelInboundHandler 是 Netty 定義的消息處理 ChannelHandler 抽象類,處理消息的類型是 <I>
泛型時。
③ #channelRead0(ChannelHandlerContext ctx, Invocation invocation)
方法,處理消息,進行分發。
<3.1>
處,調用 MessageHandlerContainer 的 #getMessageHandler(String type)
方法,得到 Invocation 的 type
對應的 MessageHandler 處理器。
而後,調用 MessageHandlerContainer 的 #getMessageClass(messageHandler)
方法,得到 MessageHandler 處理器的消息類。
<3.2>
處,調用 JSON 的 # parseObject(String text, Class<T> clazz)
方法,將 Invocation 的 message
解析成 MessageHandler 對應的消息對象。
<3.3>
處,丟到線程池中,而後調用 MessageHandler 的 #execute(Channel channel, T message)
方法,執行業務邏輯。
注意,爲何要丟到 executor
線程池中呢?咱們先來了解下 EventGroup 的線程模型。
友情提示:在咱們啓動 Netty 服務端或者客戶端時,都會設置其 EventGroup。
EventGroup 咱們能夠先簡單理解成一個線程池,而且線程池的大小僅僅是 CPU 數量 * 2。每一個 Channel 僅僅會被分配到其中的一個線程上,進行數據的讀寫。而且,多個 Channel 會共享一個線程,即便用同一個線程進行數據的讀寫。
那麼胖友試着思考下,MessageHandler 的具體邏輯視線中,每每會涉及到 IO 處理,例如說進行數據庫的讀取。這樣,就會致使一個 Channel 在執行 MessageHandler 的過程當中,阻塞了共享當前線程的其它 Channel 的數據讀取。
所以,咱們在這裏建立了 executor
線程池,進行 MessageHandler 的邏輯執行,避免阻塞 Channel 的數據讀取。
可能會有胖友說,咱們是否是可以把 EventGroup 的線程池設置大一點,例如說 200 呢?對於長鏈接的 Netty 服務端,每每會有 1000 ~ 100000 的 Netty 客戶端鏈接上來,這樣不管設置多大的線程池,都會出現阻塞數據讀取的狀況。
友情提示:executor
線程池,咱們通常稱之爲業務線程池或者邏輯線程池,顧名思義,就是執行業務邏輯的。這樣的設計方式,目前 Dubbo 等等 RPC 框架,都採用這種方式。
後續,胖友能夠認真閱讀下《【NIO 系列】——之 Reactor 模型》文章,進一步理解。
建立 NettyServerConfig 配置類,建立 MessageDispatcher 和 MessageHandlerContainer Bean。代碼以下:
@Configuration public class NettyServerConfig { @Bean public MessageDispatcher messageDispatcher() { return new MessageDispatcher(); } @Bean public MessageHandlerContainer messageHandlerContainer() { return new MessageHandlerContainer(); } }
友情提示:和 「4.5 NettyServerConfig」小結一致。
建立 NettyClientConfig 配置類,建立 MessageDispatcher 和 MessageHandlerContainer Bean。代碼以下:
@Configuration public class NettyClientConfig { @Bean public MessageDispatcher messageDispatcher() { return new MessageDispatcher(); } @Bean public MessageHandlerContainer messageHandlerContainer() { return new MessageHandlerContainer(); } }
後續,咱們將在以下小節,具體演示消息分發的使用:
Netty 客戶端須要實現斷開重連機制,解決各類狀況下的斷開狀況。例如說:
具體的代碼實現比較簡單,只須要在兩個地方增長重連機制。
考慮到重連會存在失敗的狀況,咱們採用定時重連的方式,避免佔用過多資源。
① 在 NettyClient 中,提供 #reconnect()
方法,實現定時重連的邏輯。代碼以下:
// NettyClient.java public void reconnect() { eventGroup.schedule(new Runnable() { @Override public void run() { logger.info("[reconnect][開始重連]"); try { start(); } catch (InterruptedException e) { logger.error("[reconnect][重連失敗]", e); } } }, RECONNECT_SECONDS, TimeUnit.SECONDS); logger.info("[reconnect][{} 秒後將發起重連]", RECONNECT_SECONDS); }
經過調用 EventLoop 提供的 #schedule(Runnable command, long delay, TimeUnit unit)
方法,實現定時邏輯。而在內部的具體邏輯,調用 NettyClient 的 #start()
方法,發起鏈接 Netty 服務端。
又由於 NettyClient 在 #start()
方法在鏈接 Netty 服務端失敗時,又會調用 #reconnect()
方法,從而再次發起定時重連。如此循環反覆,知道 Netty 客戶端鏈接上 Netty 服務端。以下圖所示:
② 在 NettyClientHandler 中,實現 #channelInactive(ChannelHandlerContext ctx)
方法,在發現和 Netty 服務端斷開時,調用 Netty Client 的 #reconnect()
方法,發起重連。代碼以下:
// NettyClientHandler.java @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 發起重連 nettyClient.reconnect(); // 繼續觸發事件 super.channelInactive(ctx); }
① 啓動 Netty Client,不要啓動 Netty Server,控制檯打印日誌以下圖:
能夠看到 Netty Client 在鏈接失敗時,不斷髮起定時重連。
② 啓動 Netty Server,控制檯打印以下圖:
能夠看到 Netty Client 成功重連上 Netty Server。
在上文中,艿艿推薦胖友閱讀《TCP Keepalive 機制刨根問底》文章,咱們能夠了解到 TCP 自帶的空閒檢測機制,默認是 2 小時。這樣的檢測機制,從系統資源層面上來講是能夠接受的。
可是在業務層面,若是 2 小時才發現客戶端與服務端的鏈接實際已經斷開,會致使中間很是多的消息丟失,影響客戶的使用體驗。
所以,咱們須要在業務層面,本身實現空閒檢測,保證儘快發現客戶端與服務端實際已經斷開的狀況。實現邏輯以下:
考慮到客戶端和服務端之間並非一直有消息的交互,因此咱們須要增長心跳機制:
友情提示:
- 爲何是 180 秒?能夠加大或者減少,看本身但願多快檢測到鏈接異常。太短的時間,會致使心跳過於頻繁,佔用過多資源。
- 爲何是 60 秒?三次機會,確認是否心跳超時。
雖然聽起來有點複雜,可是實現起來並不複雜哈。
在 NettyServerHandlerInitializer 中,咱們添加了一個 ReadTimeoutHandler 處理器,它在超過指定時間未從對端讀取到數據,會拋出 ReadTimeoutException 異常。以下圖所示:
經過這樣的方式,實現服務端發現 180 秒未從客戶端讀取到消息,主動斷開鏈接。
友情提示:和 「6.1 服務端的空閒檢測」一致。
在 NettyClientHandlerInitializer 中,咱們添加了一個 ReadTimeoutHandler 處理器,它在超過指定時間未從對端讀取到數據,會拋出 ReadTimeoutException 異常。以下圖所示:
經過這樣的方式,實現客戶端發現 180 秒未從服務端讀取到消息,主動斷開鏈接。
Netty 提供了 IdleStateHandler 處理器,提供空閒檢測的功能,在 Channel 的讀或者寫空閒時間太長時,將會觸發一個 IdleStateEvent 事件。
這樣,咱們只須要在 NettyClientHandler 處理器中,在接收到 IdleStateEvent 事件時,客戶端向客戶端發送一次心跳消息。以下圖所示:
同時,咱們在服務端項目中,建立了一個 HeartbeatRequestHandler 消息處理器,在收到客戶端的心跳請求時,回覆客戶端一條確認消息。代碼以下:
@Component public class HeartbeatRequestHandler implements MessageHandler<HeartbeatRequest> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void execute(Channel channel, HeartbeatRequest message) { logger.info("[execute][收到鏈接({}) 的心跳請求]", channel.id()); // 響應心跳 HeartbeatResponse response = new HeartbeatResponse(); channel.writeAndFlush(new Invocation(HeartbeatResponse.TYPE, response)); } @Override public String getType() { return HeartbeatRequest.TYPE; } }
啓動 Netty Server 服務端,再啓動 Netty Client 客戶端,耐心等待 60 秒後,能夠看到心跳日誌以下:
// ... 客戶端 2020-06-22 08:24:47.275 INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.c.handler.NettyClientHandler : [userEventTriggered][發起一次心跳] 2020-06-22 08:24:47.335 INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [encode][鏈接(44223e18) 編碼了一條消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})] 2020-06-22 08:24:47.408 INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(44223e18) 解析到一條消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})] 2020-06-22 08:24:47.409 INFO 57005 --- [pool-1-thread-1] c.i.s.l.n.m.h.HeartbeatResponseHandler : [execute][收到鏈接(44223e18) 的心跳響應] // ... 服務端 2020-06-22 08:24:47.388 INFO 56998 --- [ntLoopGroup-3-1] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(34778465) 解析到一條消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})] 2020-06-22 08:24:47.390 INFO 56998 --- [pool-1-thread-1] c.i.s.l.n.m.h.HeartbeatRequestHandler : [execute][收到鏈接(34778465) 的心跳請求] 2020-06-22 08:24:47.399 INFO 56998 --- [ntLoopGroup-3-1] c.i.s.l.n.codec.InvocationEncoder : [encode][鏈接(34778465) 編碼了一條消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]
友情提示:從本小節開始,咱們就具體看看業務邏輯的處理示例。
認證的過程,以下圖所示:
建立 AuthRequest 類,定義用戶認證請求。代碼以下:
public class AuthRequest implements Message { public static final String TYPE = "AUTH_REQUEST"; /** * 認證 Token */ private String accessToken; // ... 省略 setter、getter、toString 方法 }
這裏咱們使用 accessToken
認證令牌進行認證。
由於通常狀況下,咱們使用 HTTP 進行登陸系統,而後使用登陸後的身份標識(例如說 accessToken
認證令牌),將客戶端和當前用戶進行認證綁定。
建立 AuthResponse 類,定義用戶認證響應。代碼以下:
public class AuthResponse implements Message { public static final String TYPE = "AUTH_RESPONSE"; /** * 響應狀態碼 */ private Integer code; /** * 響應提示 */ private String message; // ... 省略 setter、getter、toString 方法 }
服務端...
建立 AuthRequestHandler 類,爲服務端處理客戶端的認證請求。代碼以下:
@Component public class AuthRequestHandler implements MessageHandler<AuthRequest> { @Autowired private NettyChannelManager nettyChannelManager; @Override public void execute(Channel channel, AuthRequest authRequest) { // <1> 若是未傳遞 accessToken if (StringUtils.isEmpty(authRequest.getAccessToken())) { AuthResponse authResponse = new AuthResponse().setCode(1).setMessage("認證 accessToken 未傳入"); channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse)); return; } // <2> ... 此處應有一段 // <3> 將用戶和 Channel 綁定 // 考慮到代碼簡化,咱們先直接使用 accessToken 做爲 User nettyChannelManager.addUser(channel, authRequest.getAccessToken()); // <4> 響應認證成功 AuthResponse authResponse = new AuthResponse().setCode(0); channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse)); } @Override public String getType() { return AuthRequest.TYPE; } }
代碼比較簡單,胖友看看 <1>
、<2>
、<3>
、<4>
上的註釋。
客戶端...
建立 AuthResponseHandler 類,爲客戶端處理服務端的認證響應。代碼以下:
@Component public class AuthResponseHandler implements MessageHandler<AuthResponse> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void execute(Channel channel, AuthResponse message) { logger.info("[execute][認證結果:{}]", message); } @Override public String getType() { return AuthResponse.TYPE; } }
打印個認證結果,方便調試。
客戶端...
建立 TestController 類,提供 /test/mock
接口,模擬客戶端向服務端發送請求。代碼以下:
@RestController @RequestMapping("/test") public class TestController { @Autowired private NettyClient nettyClient; @PostMapping("/mock") public String mock(String type, String message) { // 建立 Invocation 對象 Invocation invocation = new Invocation(type, message); // 發送消息 nettyClient.send(invocation); return "success"; } }
啓動 Netty Server 服務端,再啓動 Netty Client 客戶端,而後使用 Postman 模擬一次認證請求。以下圖所示:
同時,能夠看到認證成功的日誌以下:
// 客戶端... 2020-06-22 08:41:12.364 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [encode][鏈接(9e086597) 編碼了一條消息(Invocation{type='AUTH_REQUEST', message='{"accessToken": "yunai"}'})] 2020-06-22 08:41:12.390 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(9e086597) 解析到一條消息(Invocation{type='AUTH_RESPONSE', message='{"code":0}'})] 2020-06-22 08:41:12.392 INFO 57583 --- [pool-1-thread-1] c.i.s.l.n.m.auth.AuthResponseHandler : [execute][認證結果:AuthResponse{code=0, message='null'}] // 服務端... 2020-06-22 08:41:12.374 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(791f122b) 解析到一條消息(Invocation{type='AUTH_REQUEST', message='{"accessToken": "yunai"}'})] 2020-06-22 08:41:12.379 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder : [encode][鏈接(791f122b) 編碼了一條消息(Invocation{type='AUTH_RESPONSE', message='{"code":0}'})]
私聊的過程,以下圖所示:
服務端負責將客戶端 A 發送的私聊消息,轉發給客戶端 B。
建立 ChatSendToOneRequest 類,發送給指定人的私聊消息的請求。代碼以下:
public class ChatSendToOneRequest implements Message { public static final String TYPE = "CHAT_SEND_TO_ONE_REQUEST"; /** * 發送給的用戶 */ private String toUser; /** * 消息編號 */ private String msgId; /** * 內容 */ private String content; // ... 省略 setter、getter、toString 方法 }
建立 ChatSendResponse 類,聊天發送消息結果的響應。代碼以下:
public class ChatSendResponse implements Message { public static final String TYPE = "CHAT_SEND_RESPONSE"; /** * 消息編號 */ private String msgId; /** * 響應狀態碼 */ private Integer code; /** * 響應提示 */ private String message; // ... 省略 setter、getter、toString 方法 }
建立 ChatRedirectToUserRequest 類, 轉發消息給一個用戶的請求。代碼以下:
public class ChatRedirectToUserRequest implements Message { public static final String TYPE = "CHAT_REDIRECT_TO_USER_REQUEST"; /** * 消息編號 */ private String msgId; /** * 內容 */ private String content; // ... 省略 setter、getter、toString 方法 }
友情提示:寫完以後,艿艿忽然發現少了一個
fromUser
字段,表示來自誰的消息。
服務端...
建立 ChatSendToOneHandler 類,爲服務端處理客戶端的私聊請求。代碼以下:
@Component public class ChatSendToOneHandler implements MessageHandler<ChatSendToOneRequest> { @Autowired private NettyChannelManager nettyChannelManager; @Override public void execute(Channel channel, ChatSendToOneRequest message) { // <1> 這裏,僞裝直接成功 ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0); channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse)); // <2> 建立轉發的消息,發送給指定用戶 ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId()) .setContent(message.getContent()); nettyChannelManager.send(message.getToUser(), new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest)); } @Override public String getType() { return ChatSendToOneRequest.TYPE; } }
代碼比較簡單,胖友看看 <1>
、<2>
上的註釋。
客戶端...
建立 ChatSendResponseHandler 類,爲客戶端處理服務端的聊天響應。代碼以下:
@Component public class ChatSendResponseHandler implements MessageHandler<ChatSendResponse> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void execute(Channel channel, ChatSendResponse message) { logger.info("[execute][發送結果:{}]", message); } @Override public String getType() { return ChatSendResponse.TYPE; } }
打印個聊天發送結果,方便調試。
客戶端
建立 ChatRedirectToUserRequestHandler 類,爲客戶端處理服務端的轉發消息的請求。代碼以下:
@Component public class ChatRedirectToUserRequestHandler implements MessageHandler<ChatRedirectToUserRequest> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void execute(Channel channel, ChatRedirectToUserRequest message) { logger.info("[execute][收到消息:{}]", message); } @Override public String getType() { return ChatRedirectToUserRequest.TYPE; } }
打印個聊天接收消息,方便調試。
① 啓動 Netty Server 服務端。
② 啓動 Netty Client 客戶端 A。而後使用 Postman 模擬一次認證請求(用戶爲 yunai
)。以下圖所示:
③ 啓動 Netty Client 客戶端 B。注意,須要設置 --server.port
端口爲 8081,避免衝突。以下圖所示:
而後使用 Postman 模擬一次認證請求(用戶爲 tutou
)。以下圖所示:
④ 最後使用 Postman 模擬一次 yunai
芋艿給 tutou
土豆發送一次私聊消息。以下圖所示:
同時,能夠看到客戶端 A 向客戶端 B 發送私聊消息的日誌以下:
// 客戶端 A...(芋艿) 2020-06-22 08:48:09.505 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [decode][鏈接(9e086597) 編碼了一條消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tudou", msgId: "1", content: "你猜"}'})] 2020-06-22 08:48:09.510 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(9e086597) 解析到一條消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})] 2020-06-22 08:48:09.511 INFO 57583 --- [ool-1-thread-69] c.i.s.l.n.m.c.ChatSendResponseHandler : [execute][發送結果:ChatSendResponse{msgId='1', code=0, message='null'}] 2020-06-22 08:48:35.148 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [decode][鏈接(9e086597) 編碼了一條消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tutou", msgId: "1", content: "你猜"}'})] 2020-06-22 08:48:35.150 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(9e086597) 解析到一條消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})] 2020-06-22 08:48:35.150 INFO 57583 --- [ool-1-thread-70] c.i.s.l.n.m.c.ChatSendResponseHandler : [execute][發送結果:ChatSendResponse{msgId='1', code=0, message='null'}] // 服務端 ... 2020-06-22 08:48:35.149 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(791f122b) 解析到一條消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tutou", msgId: "1", content: "你猜"}'})] 2020-06-22 08:48:35.149 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder : [decode][鏈接(791f122b) 編碼了一條消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})] 2020-06-22 08:48:35.149 INFO 56998 --- [ntLoopGroup-3-3] c.i.s.l.n.codec.InvocationEncoder : [decode][鏈接(79cb3a1e) 編碼了一條消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"你猜","msgId":"1"}'})] // 客戶端 B...(禿頭) 2020-06-22 08:48:18.277 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.c.handler.NettyClientHandler : [userEventTriggered][發起一次心跳] 2020-06-22 08:48:18.278 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [encode][鏈接(24fbc3e8) 編碼了一條消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})] 2020-06-22 08:48:18.280 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(24fbc3e8) 解析到一條消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})] 2020-06-22 08:48:18.281 INFO 59613 --- [pool-1-thread-4] c.i.s.l.n.m.h.HeartbeatResponseHandler : [execute][收到鏈接(24fbc3e8) 的心跳響應] 2020-06-22 08:48:35.150 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(24fbc3e8) 解析到一條消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"你猜","msgId":"1"}'})] 2020-06-22 08:48:35.151 INFO 59613 --- [pool-1-thread-5] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='1', content='你猜'}]
羣聊的過程,以下圖所示:
服務端負責將客戶端 A 發送的羣聊消息,轉發給客戶端 A、B、C。
友情提示:考慮到邏輯簡潔,艿艿提供的本小節的示例,並非一個一個羣,而是全部人在一個大的羣聊中哈~
建立 ChatSendToOneRequest 類,發送給全部人的羣聊消息的請求。代碼以下:
public class ChatSendToAllRequest implements Message { public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST"; /** * 消息編號 */ private String msgId; /** * 內容 */ private String content; // ... 省略 setter、getter、toString 方法 }
友情提示:若是是正經的羣聊,會有一個
groupId
字段,表示羣編號。
和「8.2 ChatSendResponse」小節一致。
和「8.3 ChatRedirectToUserRequest」小節一致。
服務端...
建立 ChatSendToAllHandler 類,爲服務端處理客戶端的羣聊請求。代碼以下:
@Component public class ChatSendToAllHandler implements MessageHandler<ChatSendToAllRequest> { @Autowired private NettyChannelManager nettyChannelManager; @Override public void execute(Channel channel, ChatSendToAllRequest message) { // <1> 這裏,僞裝直接成功 ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0); channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse)); // <2> 建立轉發的消息,並廣播發送 ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId()) .setContent(message.getContent()); nettyChannelManager.sendAll(new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest)); } @Override public String getType() { return ChatSendToAllRequest.TYPE; } }
代碼比較簡單,胖友看看 <1>
、<2>
上的註釋。
和「8.5 ChatSendResponseHandler」小節一致。
和「8.6 ChatRedirectToUserRequestHandler」小節一致。
① 啓動 Netty Server 服務端。
② 啓動 Netty Client 客戶端 A。而後使用 Postman 模擬一次認證請求(用戶爲 yunai
)。以下圖所示:
③ 啓動 Netty Client 客戶端 B。注意,須要設置 --server.port
端口爲 8081,避免衝突。
④ 啓動 Netty Client 客戶端 C。注意,須要設置 --server.port
端口爲 8082,避免衝突。
⑤ 最後使用 Postman 模擬一次發送羣聊消息。以下圖所示:
同時,能夠看到客戶端 A 羣發給全部客戶端的日誌以下:
// 客戶端 A... 2020-06-22 08:55:44.898 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [decode][鏈接(9e086597) 編碼了一條消息(Invocation{type='CHAT_SEND_TO_ALL_REQUEST', message='{msgId: "2", content: "廣播消息"}'})] 2020-06-22 08:55:44.901 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(9e086597) 解析到一條消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"2"}'})] 2020-06-22 08:55:44.901 INFO 57583 --- [ol-1-thread-148] c.i.s.l.n.m.c.ChatSendResponseHandler : [execute][發送結果:ChatSendResponse{msgId='2', code=0, message='null'}] 2020-06-22 08:55:44.901 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(9e086597) 解析到一條消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"廣播消息","msgId":"2"}'})] 2020-06-22 08:55:44.903 INFO 57583 --- [ol-1-thread-149] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='廣播消息'}] // 服務端... 2020-06-22 08:55:44.898 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(791f122b) 解析到一條消息(Invocation{type='CHAT_SEND_TO_ALL_REQUEST', message='{msgId: "2", content: "廣播消息"}'})] 2020-06-22 08:55:44.901 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder : [decode][鏈接(791f122b) 編碼了一條消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"2"}'})] 2020-06-22 08:55:44.901 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder : [decode][鏈接(791f122b) 編碼了一條消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"廣播消息","msgId":"2"}'})] 2020-06-22 08:55:44.901 INFO 56998 --- [ntLoopGroup-3-3] c.i.s.l.n.codec.InvocationEncoder : [decode][鏈接(79cb3a1e) 編碼了一條消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"廣播消息","msgId":"2"}'})] 2020-06-22 08:55:44.901 INFO 56998 --- [ntLoopGroup-3-4] c.i.s.l.n.codec.InvocationEncoder : [decode][鏈接(9dc03826) 編碼了一條消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"廣播消息","msgId":"2"}'})] // 客戶端 B... 2020-06-22 08:55:44.902 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(24fbc3e8) 解析到一條消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"廣播消息","msgId":"2"}'})] 2020-06-22 08:55:44.902 INFO 59613 --- [ool-1-thread-83] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='廣播消息'}] // 客戶端 C... 2020-06-22 08:55:44.901 INFO 61597 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][鏈接(9128c71c) 解析到一條消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"廣播消息","msgId":"2"}'})] 2020-06-22 08:55:44.903 INFO 61597 --- [ool-1-thread-16] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='廣播消息'}]
至此,咱們已經經過 Netty 實現了一個簡單的 IM 功能,是否是收穫蠻大的,嘿嘿。
下面,良心的艿艿,再來推薦一波文章,嘿嘿。
等後續,艿艿會在 https://github.com/YunaiV/one... 開源項目中,實現一個相對完整的客服功能,哈哈哈~